Merge branch 'master' of https://github.com/ClickHouse/ClickHouse into fix-local-exceptions

This commit is contained in:
kssenii 2021-10-20 23:42:27 +00:00
commit ec7c93728b
179 changed files with 4773 additions and 643 deletions

53
.github/workflows/main.yml vendored Normal file
View File

@ -0,0 +1,53 @@
name: Ligthweight GithubActions
on: # yamllint disable-line rule:truthy
pull_request:
types:
- labeled
- unlabeled
- synchronize
- reopened
- opened
branches:
- master
jobs:
CheckLabels:
runs-on: [self-hosted]
steps:
- name: Check out repository code
uses: actions/checkout@v2
- name: Labels check
run: cd $GITHUB_WORKSPACE/tests/ci && python3 run_check.py
DockerHubPush:
needs: CheckLabels
runs-on: [self-hosted]
steps:
- name: Check out repository code
uses: actions/checkout@v2
- name: Images check
run: cd $GITHUB_WORKSPACE/tests/ci && python3 docker_images_check.py
- name: Upload images files to artifacts
uses: actions/upload-artifact@v2
with:
name: changed_images
path: ${{ runner.temp }}/docker_images_check/changed_images.json
StyleCheck:
needs: DockerHubPush
runs-on: [self-hosted]
steps:
- name: Download changed images
uses: actions/download-artifact@v2
with:
name: changed_images
path: ${{ runner.temp }}/style_check
- name: Check out repository code
uses: actions/checkout@v2
- name: Style Check
run: cd $GITHUB_WORKSPACE/tests/ci && python3 style_check.py
FinishCheck:
needs: [StyleCheck, DockerHubPush, CheckLabels]
runs-on: [self-hosted]
steps:
- name: Check out repository code
uses: actions/checkout@v2
- name: Finish label
run: cd $GITHUB_WORKSPACE/tests/ci && python3 finish_check.py

2
.gitmodules vendored
View File

@ -140,7 +140,7 @@
url = https://github.com/ClickHouse-Extras/libc-headers.git
[submodule "contrib/replxx"]
path = contrib/replxx
url = https://github.com/ClickHouse-Extras/replxx.git
url = https://github.com/AmokHuginnsson/replxx.git
[submodule "contrib/avro"]
path = contrib/avro
url = https://github.com/ClickHouse-Extras/avro.git

View File

@ -177,6 +177,10 @@ ReplxxLineReader::ReplxxLineReader(
/// bind C-p/C-n to history-previous/history-next like readline.
rx.bind_key(Replxx::KEY::control('N'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::HISTORY_NEXT, code); });
rx.bind_key(Replxx::KEY::control('P'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::HISTORY_PREVIOUS, code); });
/// bind C-j to ENTER action.
rx.bind_key(Replxx::KEY::control('J'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::COMMIT_LINE, code); });
/// By default COMPLETE_NEXT/COMPLETE_PREV was binded to C-p/C-n, re-bind
/// to M-P/M-N (that was used for HISTORY_COMMON_PREFIX_SEARCH before, but
/// it also binded to M-p/M-n).

View File

@ -34,8 +34,6 @@ endif()
if (CAPNP_LIBRARIES)
set (USE_CAPNP 1)
elseif(NOT MISSING_INTERNAL_CAPNP_LIBRARY)
add_subdirectory(contrib/capnproto-cmake)
set (CAPNP_LIBRARIES capnpc)
set (USE_CAPNP 1)
set (USE_INTERNAL_CAPNP_LIBRARY 1)

View File

@ -1,16 +1,5 @@
# Third-party libraries may have substandard code.
# Put all targets defined here and in added subfolders under "contrib/" folder in GUI-based IDEs by default.
# Some of third-party projects may override CMAKE_FOLDER or FOLDER property of their targets, so they will
# appear not in "contrib/" as originally planned here.
get_filename_component (_current_dir_name "${CMAKE_CURRENT_LIST_DIR}" NAME)
if (CMAKE_FOLDER)
set (CMAKE_FOLDER "${CMAKE_FOLDER}/${_current_dir_name}")
else ()
set (CMAKE_FOLDER "${_current_dir_name}")
endif ()
unset (_current_dir_name)
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -w")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -w")
@ -49,6 +38,10 @@ add_subdirectory (replxx-cmake)
add_subdirectory (unixodbc-cmake)
add_subdirectory (nanodbc-cmake)
if (USE_INTERNAL_CAPNP_LIBRARY AND NOT MISSING_INTERNAL_CAPNP_LIBRARY)
add_subdirectory(capnproto-cmake)
endif ()
if (ENABLE_FUZZING)
add_subdirectory (libprotobuf-mutator-cmake)
endif()
@ -352,3 +345,76 @@ endif()
if (USE_S2_GEOMETRY)
add_subdirectory(s2geometry-cmake)
endif()
# Put all targets defined here and in subdirectories under "contrib/<immediate-subdir>" folders in GUI-based IDEs.
# Some of third-party projects may override CMAKE_FOLDER or FOLDER property of their targets, so they would not appear
# in "contrib/..." as originally planned, so we workaround this by fixing FOLDER properties of all targets manually,
# instead of controlling it via CMAKE_FOLDER.
function (ensure_target_rooted_in _target _folder)
# Skip INTERFACE library targets, since FOLDER property is not available for them.
get_target_property (_target_type "${_target}" TYPE)
if (_target_type STREQUAL "INTERFACE_LIBRARY")
return ()
endif ()
# Read the original FOLDER property value, if any.
get_target_property (_folder_prop "${_target}" FOLDER)
# Normalize that value, so we avoid possible repetitions in folder names.
if (NOT _folder_prop)
set (_folder_prop "")
endif ()
if (CMAKE_FOLDER AND _folder_prop MATCHES "^${CMAKE_FOLDER}/(.*)\$")
set (_folder_prop "${CMAKE_MATCH_1}")
endif ()
if (_folder AND _folder_prop MATCHES "^${_folder}/(.*)\$")
set (_folder_prop "${CMAKE_MATCH_1}")
endif ()
if (_folder)
set (_folder_prop "${_folder}/${_folder_prop}")
endif ()
if (CMAKE_FOLDER)
set (_folder_prop "${CMAKE_FOLDER}/${_folder_prop}")
endif ()
# Set the updated FOLDER property value back.
set_target_properties ("${_target}" PROPERTIES FOLDER "${_folder_prop}")
endfunction ()
function (ensure_own_targets_are_rooted_in _dir _folder)
get_directory_property (_targets DIRECTORY "${_dir}" BUILDSYSTEM_TARGETS)
foreach (_target IN LISTS _targets)
ensure_target_rooted_in ("${_target}" "${_folder}")
endforeach ()
endfunction ()
function (ensure_all_targets_are_rooted_in _dir _folder)
ensure_own_targets_are_rooted_in ("${_dir}" "${_folder}")
get_property (_sub_dirs DIRECTORY "${_dir}" PROPERTY SUBDIRECTORIES)
foreach (_sub_dir IN LISTS _sub_dirs)
ensure_all_targets_are_rooted_in ("${_sub_dir}" "${_folder}")
endforeach ()
endfunction ()
function (organize_ide_folders_2_level _dir)
get_filename_component (_dir_name "${_dir}" NAME)
ensure_own_targets_are_rooted_in ("${_dir}" "${_dir_name}")
# Note, that we respect only first two levels of nesting, we don't want to
# reorganize target folders further within each third-party dir.
get_property (_sub_dirs DIRECTORY "${_dir}" PROPERTY SUBDIRECTORIES)
foreach (_sub_dir IN LISTS _sub_dirs)
get_filename_component (_sub_dir_name "${_sub_dir}" NAME)
ensure_all_targets_are_rooted_in ("${_sub_dir}" "${_dir_name}/${_sub_dir_name}")
endforeach ()
endfunction ()
organize_ide_folders_2_level ("${CMAKE_CURRENT_LIST_DIR}")

2
contrib/capnproto vendored

@ -1 +1 @@
Subproject commit a00ccd91b3746ef2ab51d40fe3265829949d1ace
Subproject commit c8189ec3c27dacbd4a3288e682473010e377f593

View File

@ -45,6 +45,7 @@ set (CAPNP_SRCS
"${CAPNPROTO_SOURCE_DIR}/capnp/serialize-packed.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/schema.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/stream.capnp.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/schema-loader.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/dynamic.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/stringify.c++"
@ -63,6 +64,7 @@ set (CAPNPC_SRCS
"${CAPNPROTO_SOURCE_DIR}/capnp/compiler/lexer.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/compiler/grammar.capnp.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/compiler/parser.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/compiler/generics.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/compiler/node-translator.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/compiler/compiler.c++"
"${CAPNPROTO_SOURCE_DIR}/capnp/schema-parser.c++"

View File

@ -47,6 +47,7 @@ set(SRCS
)
add_library(cxx ${SRCS})
set_target_properties(cxx PROPERTIES FOLDER "contrib/libcxx-cmake")
target_include_directories(cxx SYSTEM BEFORE PUBLIC $<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}/include>)
target_compile_definitions(cxx PRIVATE -D_LIBCPP_BUILDING_LIBRARY -DLIBCXX_BUILDING_LIBCXXABI)

View File

@ -22,6 +22,7 @@ set(SRCS
)
add_library(cxxabi ${SRCS})
set_target_properties(cxxabi PROPERTIES FOLDER "contrib/libcxxabi-cmake")
# Third party library may have substandard code.
target_compile_options(cxxabi PRIVATE -w)

View File

@ -39,6 +39,7 @@ set(LIBUNWIND_SOURCES
${LIBUNWIND_ASM_SOURCES})
add_library(unwind ${LIBUNWIND_SOURCES})
set_target_properties(unwind PROPERTIES FOLDER "contrib/libunwind-cmake")
target_include_directories(unwind SYSTEM BEFORE PUBLIC $<BUILD_INTERFACE:${LIBUNWIND_SOURCE_DIR}/include>)
target_compile_definitions(unwind PRIVATE -D_LIBUNWIND_NO_HEAP=1 -D_DEBUG -D_LIBUNWIND_IS_NATIVE_ONLY)

2
contrib/replxx vendored

@ -1 +1 @@
Subproject commit f97765df14f4a6236d69b8f14b53ef2051ebd95a
Subproject commit b0c266c2d8a835784181e17292b421848c78c6b8

View File

@ -1,16 +1,22 @@
# docker build -t clickhouse/kerberized-hadoop .
FROM sequenceiq/hadoop-docker:2.7.0
RUN sed -i -e 's/^\#baseurl/baseurl/' /etc/yum.repos.d/CentOS-Base.repo
RUN sed -i -e 's/^mirrorlist/#mirrorlist/' /etc/yum.repos.d/CentOS-Base.repo
RUN sed -i -e 's#http://mirror.centos.org/#http://vault.centos.org/#' /etc/yum.repos.d/CentOS-Base.repo
RUN sed -i -e 's/^\#baseurl/baseurl/' /etc/yum.repos.d/CentOS-Base.repo && \
sed -i -e 's/^mirrorlist/#mirrorlist/' /etc/yum.repos.d/CentOS-Base.repo && \
sed -i -e 's#http://mirror.centos.org/#http://vault.centos.org/#' /etc/yum.repos.d/CentOS-Base.repo
# https://community.letsencrypt.org/t/rhel-centos-6-openssl-client-compatibility-after-dst-root-ca-x3-expiration/161032/81
RUN sed -i s/xMDkzMDE0MDExNVow/0MDkzMDE4MTQwM1ow/ /etc/pki/tls/certs/ca-bundle.crt
RUN yum clean all && \
rpm --rebuilddb && \
yum -y update && \
yum -y install yum-plugin-ovl && \
yum --quiet -y install krb5-workstation.x86_64
RUN cd /tmp && \
curl http://archive.apache.org/dist/commons/daemon/source/commons-daemon-1.0.15-src.tar.gz -o commons-daemon-1.0.15-src.tar.gz && \
curl http://archive.apache.org/dist/commons/daemon/source/commons-daemon-1.0.15-src.tar.gz -o commons-daemon-1.0.15-src.tar.gz && \
tar xzf commons-daemon-1.0.15-src.tar.gz && \
cd commons-daemon-1.0.15-src/src/native/unix && \
./configure && \

View File

@ -37,7 +37,9 @@ RUN set -x \
|| echo "WARNING: Some file was just downloaded from the internet without any validation and we are installing it into the system"; } \
&& dpkg -i "${PKG_VERSION}.deb"
CMD echo "Running PVS version $PKG_VERSION" && cd /repo_folder && pvs-studio-analyzer credentials $LICENCE_NAME $LICENCE_KEY -o ./licence.lic \
ENV CCACHE_DIR=/test_output/ccache
CMD echo "Running PVS version $PKG_VERSION" && mkdir -p $CCACHE_DIR && cd /repo_folder && pvs-studio-analyzer credentials $LICENCE_NAME $LICENCE_KEY -o ./licence.lic \
&& cmake . -D"ENABLE_EMBEDDED_COMPILER"=OFF -D"USE_INTERNAL_PROTOBUF_LIBRARY"=OFF -D"USE_INTERNAL_GRPC_LIBRARY"=OFF -DCMAKE_C_COMPILER=clang-13 -DCMAKE_CXX_COMPILER=clang\+\+-13 \
&& ninja re2_st clickhouse_grpc_protos \
&& pvs-studio-analyzer analyze -o pvs-studio.log -e contrib -j 4 -l ./licence.lic; \

View File

@ -1,5 +1,7 @@
#!/bin/bash
# yaml check is not the best one
cd /ClickHouse/utils/check-style || echo -e "failure\tRepo not found" > /test_output/check_status.tsv
./check-style -n |& tee /test_output/style_output.txt
./check-typos |& tee /test_output/typos_output.txt

View File

@ -128,6 +128,8 @@ You can pass parameters to `clickhouse-client` (all parameters have a default va
- `--history_file` — Path to a file containing command history.
- `--param_<name>` — Value for a [query with parameters](#cli-queries-with-parameters).
- `--hardware-utilization` — Print hardware utilization information in progress bar.
- `--print-profile-events` Print `ProfileEvents` packets.
- `--profile-events-delay-ms` Delay between printing `ProfileEvents` packets (-1 - print only totals, 0 - print every single packet).
Since version 20.5, `clickhouse-client` has automatic syntax highlighting (always enabled).

View File

@ -20,7 +20,7 @@ Subquery is another `SELECT` query that may be specified in parenthesis inside `
When `FINAL` is specified, ClickHouse fully merges the data before returning the result and thus performs all data transformations that happen during merges for the given table engine.
It is applicable when selecting data from tables that use the [MergeTree](../../../engines/table-engines/mergetree-family/mergetree.md)-engine family (except `GraphiteMergeTree`). Also supported for:
It is applicable when selecting data from tables that use the [MergeTree](../../../engines/table-engines/mergetree-family/mergetree.md)-engine family. Also supported for:
- [Replicated](../../../engines/table-engines/mergetree-family/replication.md) versions of `MergeTree` engines.
- [View](../../../engines/table-engines/special/view.md), [Buffer](../../../engines/table-engines/special/buffer.md), [Distributed](../../../engines/table-engines/special/distributed.md), and [MaterializedView](../../../engines/table-engines/special/materializedview.md) engines that operate over other engines, provided they were created over `MergeTree`-engine tables.

View File

@ -20,7 +20,7 @@ toc_title: FROM
Если в запросе используется модификатор `FINAL`, то ClickHouse полностью мёржит данные перед выдачей результата, таким образом выполняя все преобразования данных, которые производятся движком таблиц при мёржах.
Он применим при выборе данных из таблиц, использующих [MergeTree](../../../engines/table-engines/mergetree-family/mergetree.md)- семейство движков (кроме `GraphiteMergeTree`). Также поддерживается для:
Он применим при выборе данных из таблиц, использующих [MergeTree](../../../engines/table-engines/mergetree-family/mergetree.md)- семейство движков. Также поддерживается для:
- [Replicated](../../../engines/table-engines/mergetree-family/replication.md) варианты исполнения `MergeTree` движков.
- [View](../../../engines/table-engines/special/view.md), [Buffer](../../../engines/table-engines/special/buffer.md), [Distributed](../../../engines/table-engines/special/distributed.md), и [MaterializedView](../../../engines/table-engines/special/materializedview.md), которые работают поверх других движков, если они созданы для таблиц с движками семейства `MergeTree`.

View File

@ -20,7 +20,7 @@ toc_title: FROM
`FINAL` 被指定ClickHouse会在返回结果之前完全合并数据从而执行给定表引擎合并期间发生的所有数据转换。
它适用于从使用 [MergeTree](../../../engines/table-engines/mergetree-family/mergetree.md)-引擎族(除了 `GraphiteMergeTree`). 还支持:
它适用于从使用 [MergeTree](../../../engines/table-engines/mergetree-family/mergetree.md)-引擎族. 还支持:
- [Replicated](../../../engines/table-engines/mergetree-family/replication.md) 版本 `MergeTree` 引擎
- [View](../../../engines/table-engines/special/view.md), [Buffer](../../../engines/table-engines/special/buffer.md), [Distributed](../../../engines/table-engines/special/distributed.md),和 [MaterializedView](../../../engines/table-engines/special/materializedview.md) 在其他引擎上运行的引擎,只要是它们底层是 `MergeTree`-引擎表即可。

View File

@ -25,9 +25,6 @@
#endif
#include <Common/Exception.h>
#include <Common/formatReadable.h>
#include <Common/NetException.h>
#include <Common/Config/ConfigProcessor.h>
#include <Common/PODArray.h>
#include <Common/TerminalSize.h>
#include <Common/Config/configReadClient.h>
#include "Common/MemoryTracker.h"
@ -35,13 +32,11 @@
#include <Core/QueryProcessingStage.h>
#include <Client/TestHint.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Poco/Util/Application.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/UseSSL.h>
@ -51,9 +46,6 @@
#include <Parsers/ASTUseQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/formatAST.h>
#include <Interpreters/InterpreterSetQuery.h>
@ -86,7 +78,6 @@ namespace ErrorCodes
extern const int SYNTAX_ERROR;
extern const int TOO_DEEP_RECURSION;
extern const int NETWORK_ERROR;
extern const int UNRECOGNIZED_ARGUMENTS;
extern const int AUTHENTICATION_FAILED;
}
@ -992,7 +983,7 @@ void Client::printHelpMessage(const OptionsDescription & options_description)
}
void Client::addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments)
void Client::addOptions(OptionsDescription & options_description)
{
/// Main commandline options related to client functionality and all parameters from Settings.
options_description.main_description->add_options()
@ -1049,14 +1040,6 @@ void Client::addAndCheckOptions(OptionsDescription & options_description, po::va
(
"types", po::value<std::string>(), "types"
);
cmd_settings.addProgramOptions(options_description.main_description.value());
/// Parse main commandline options.
po::parsed_options parsed = po::command_line_parser(arguments).options(options_description.main_description.value()).run();
auto unrecognized_options = po::collect_unrecognized(parsed.options, po::collect_unrecognized_mode::include_positional);
if (unrecognized_options.size() > 1)
throw Exception(ErrorCodes::UNRECOGNIZED_ARGUMENTS, "Unrecognized option '{}'", unrecognized_options[1]);
po::store(parsed, options);
}
@ -1235,16 +1218,16 @@ int mainEntryClickHouseClient(int argc, char ** argv)
client.init(argc, argv);
return client.run();
}
catch (const boost::program_options::error & e)
{
std::cerr << "Bad arguments: " << e.what() << std::endl;
return 1;
}
catch (const DB::Exception & e)
{
std::cerr << DB::getExceptionMessage(e, false) << std::endl;
return 1;
}
catch (const boost::program_options::error & e)
{
std::cerr << "Bad arguments: " << e.what() << std::endl;
return DB::ErrorCodes::BAD_ARGUMENTS;
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << std::endl;

View File

@ -24,7 +24,7 @@ protected:
String getName() const override { return "client"; }
void printHelpMessage(const OptionsDescription & options_description) override;
void addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments) override;
void addOptions(OptionsDescription & options_description) override;
void processOptions(const OptionsDescription & options_description, const CommandLineOptions & options,
const std::vector<Arguments> & external_tables_arguments) override;
void processConfig() override;

View File

@ -25,6 +25,8 @@
#include <Storages/StorageFactory.h>
#include <Storages/registerStorages.h>
#include <DataTypes/DataTypeFactory.h>
#include <Formats/FormatFactory.h>
#include <Formats/registerFormats.h>
#pragma GCC diagnostic ignored "-Wunused-function"
@ -114,6 +116,7 @@ int mainEntryClickHouseFormat(int argc, char ** argv)
registerAggregateFunctions();
registerTableFunctions();
registerStorages();
registerFormats();
std::unordered_set<std::string> additional_names;
@ -130,6 +133,8 @@ int mainEntryClickHouseFormat(int argc, char ** argv)
return FunctionFactory::instance().has(what)
|| AggregateFunctionFactory::instance().isAggregateFunctionName(what)
|| TableFunctionFactory::instance().isTableFunctionName(what)
|| FormatFactory::instance().isOutputFormat(what)
|| FormatFactory::instance().isInputFormat(what)
|| additional_names.count(what);
};

View File

@ -1,8 +1,6 @@
#include "LocalServer.h"
#include <Poco/Util/XMLConfiguration.h>
#include <Poco/Util/HelpFormatter.h>
#include <Poco/Util/OptionCallback.h>
#include <Poco/String.h>
#include <Poco/Logger.h>
#include <Poco/NullChannel.h>
@ -10,7 +8,6 @@
#include <Storages/System/attachSystemTables.h>
#include <Storages/System/attachInformationSchemaTables.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/loadMetadata.h>
#include <Interpreters/DatabaseCatalog.h>
#include <base/getFQDNOrHostName.h>
@ -20,17 +17,12 @@
#include <Common/Exception.h>
#include <Common/Macros.h>
#include <Common/Config/ConfigProcessor.h>
#include <Common/escapeForFileName.h>
#include <Common/ClickHouseRevision.h>
#include <Common/ThreadStatus.h>
#include <Common/UnicodeBar.h>
#include <Common/config_version.h>
#include <Common/quoteString.h>
#include <loggers/Loggers.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
#include <IO/UseSSL.h>
#include <Parsers/IAST.h>
#include <base/ErrorHandlers.h>
@ -42,9 +34,7 @@
#include <Disks/registerDisks.h>
#include <Formats/registerFormats.h>
#include <boost/program_options/options_description.hpp>
#include <boost/program_options.hpp>
#include <base/argsToConfig.h>
#include <Common/TerminalSize.h>
#include <Common/randomSeed.h>
#include <filesystem>
@ -519,19 +509,16 @@ void LocalServer::processConfig()
format = config().getString("output-format", config().getString("format", is_interactive ? "PrettyCompact" : "TSV"));
insert_format = "Values";
/// Setting value from cmd arg overrides one from config
if (global_context->getSettingsRef().max_insert_block_size.changed)
insert_format_max_block_size = global_context->getSettingsRef().max_insert_block_size;
else
insert_format_max_block_size = config().getInt("insert_format_max_block_size", global_context->getSettingsRef().max_insert_block_size);
/// Skip networking
/// Sets external authenticators config (LDAP, Kerberos).
global_context->setExternalAuthenticatorsConfig(config());
global_context->initializeBackgroundExecutors();
setupUsers();
/// Limit on total number of concurrently executing queries.
@ -667,7 +654,7 @@ void LocalServer::printHelpMessage(const OptionsDescription & options_descriptio
}
void LocalServer::addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments)
void LocalServer::addOptions(OptionsDescription & options_description)
{
options_description.main_description->add_options()
("database,d", po::value<std::string>(), "database")
@ -685,11 +672,8 @@ void LocalServer::addAndCheckOptions(OptionsDescription & options_description, p
("logger.level", po::value<std::string>(), "Log level")
("no-system-tables", "do not attach system tables (better startup time)")
("path", po::value<std::string>(), "Storage path")
;
cmd_settings.addProgramOptions(options_description.main_description.value());
po::parsed_options parsed = po::command_line_parser(arguments).options(options_description.main_description.value()).run();
po::store(parsed, options);
}
@ -744,6 +728,17 @@ int mainEntryClickHouseLocal(int argc, char ** argv)
app.init(argc, argv);
return app.run();
}
catch (const DB::Exception & e)
{
std::cerr << DB::getExceptionMessage(e, false) << std::endl;
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
}
catch (const boost::program_options::error & e)
{
std::cerr << "Bad arguments: " << e.what() << std::endl;
return DB::ErrorCodes::BAD_ARGUMENTS;
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << '\n';

View File

@ -40,7 +40,7 @@ protected:
String getQueryTextPrefix() override;
void printHelpMessage(const OptionsDescription & options_description) override;
void addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments) override;
void addOptions(OptionsDescription & options_description) override;
void processOptions(const OptionsDescription & options_description, const CommandLineOptions & options,
const std::vector<Arguments> &) override;
void processConfig() override;

View File

@ -919,7 +919,7 @@ if (ThreadFuzzer::instance().isEffective())
/// Initialize background executors after we load default_profile config.
/// This is needed to load proper values of background_pool_size etc.
global_context->initializeBackgroundExecutors();
global_context->initializeBackgroundExecutorsIfNeeded();
if (settings.async_insert_threads)
global_context->setAsynchronousInsertQueue(std::make_shared<AsynchronousInsertQueue>(

View File

@ -71,6 +71,7 @@ namespace ErrorCodes
extern const int UNEXPECTED_PACKET_FROM_SERVER;
extern const int INVALID_USAGE_OF_INPUT;
extern const int CANNOT_SET_SIGNAL_HANDLER;
extern const int UNRECOGNIZED_ARGUMENTS;
}
}
@ -266,7 +267,7 @@ void ClientBase::onLogData(Block & block)
{
initLogsOutputStream();
progress_indication.clearProgressOutput();
logs_out_stream->write(block);
logs_out_stream->writeLogs(block);
logs_out_stream->flush();
}
@ -668,39 +669,61 @@ void ClientBase::onEndOfStream()
void ClientBase::onProfileEvents(Block & block)
{
const auto rows = block.rows();
if (rows == 0 || !progress_indication.print_hardware_utilization)
if (rows == 0)
return;
const auto & array_thread_id = typeid_cast<const ColumnUInt64 &>(*block.getByName("thread_id").column).getData();
const auto & names = typeid_cast<const ColumnString &>(*block.getByName("name").column);
const auto & host_names = typeid_cast<const ColumnString &>(*block.getByName("host_name").column);
const auto & array_values = typeid_cast<const ColumnUInt64 &>(*block.getByName("value").column).getData();
const auto * user_time_name = ProfileEvents::getName(ProfileEvents::UserTimeMicroseconds);
const auto * system_time_name = ProfileEvents::getName(ProfileEvents::SystemTimeMicroseconds);
HostToThreadTimesMap thread_times;
for (size_t i = 0; i < rows; ++i)
if (progress_indication.print_hardware_utilization)
{
auto thread_id = array_thread_id[i];
auto host_name = host_names.getDataAt(i).toString();
if (thread_id != 0)
progress_indication.addThreadIdToList(host_name, thread_id);
auto event_name = names.getDataAt(i);
auto value = array_values[i];
if (event_name == user_time_name)
const auto & array_thread_id = typeid_cast<const ColumnUInt64 &>(*block.getByName("thread_id").column).getData();
const auto & names = typeid_cast<const ColumnString &>(*block.getByName("name").column);
const auto & host_names = typeid_cast<const ColumnString &>(*block.getByName("host_name").column);
const auto & array_values = typeid_cast<const ColumnUInt64 &>(*block.getByName("value").column).getData();
const auto * user_time_name = ProfileEvents::getName(ProfileEvents::UserTimeMicroseconds);
const auto * system_time_name = ProfileEvents::getName(ProfileEvents::SystemTimeMicroseconds);
HostToThreadTimesMap thread_times;
for (size_t i = 0; i < rows; ++i)
{
thread_times[host_name][thread_id].user_ms = value;
auto thread_id = array_thread_id[i];
auto host_name = host_names.getDataAt(i).toString();
if (thread_id != 0)
progress_indication.addThreadIdToList(host_name, thread_id);
auto event_name = names.getDataAt(i);
auto value = array_values[i];
if (event_name == user_time_name)
{
thread_times[host_name][thread_id].user_ms = value;
}
else if (event_name == system_time_name)
{
thread_times[host_name][thread_id].system_ms = value;
}
else if (event_name == MemoryTracker::USAGE_EVENT_NAME)
{
thread_times[host_name][thread_id].memory_usage = value;
}
}
else if (event_name == system_time_name)
progress_indication.updateThreadEventData(thread_times);
}
if (profile_events.print)
{
if (profile_events.watch.elapsedMilliseconds() >= profile_events.delay_ms)
{
thread_times[host_name][thread_id].system_ms = value;
initLogsOutputStream();
progress_indication.clearProgressOutput();
logs_out_stream->writeProfileEvents(block);
logs_out_stream->flush();
profile_events.watch.restart();
profile_events.last_block = {};
}
else if (event_name == MemoryTracker::USAGE_EVENT_NAME)
else
{
thread_times[host_name][thread_id].memory_usage = value;
profile_events.last_block = block;
}
}
progress_indication.updateThreadEventData(thread_times);
}
@ -1023,6 +1046,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
processed_rows = 0;
written_first_block = false;
progress_indication.resetProgress();
profile_events.watch.restart();
{
/// Temporarily apply query settings to context.
@ -1091,6 +1115,15 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
}
}
/// Always print last block (if it was not printed already)
if (profile_events.last_block)
{
initLogsOutputStream();
progress_indication.clearProgressOutput();
logs_out_stream->writeProfileEvents(profile_events.last_block);
logs_out_stream->flush();
}
if (is_interactive)
{
std::cout << std::endl << processed_rows << " rows in set. Elapsed: " << progress_indication.elapsedSeconds() << " sec. ";
@ -1503,6 +1536,26 @@ void ClientBase::readArguments(int argc, char ** argv, Arguments & common_argume
}
}
void ClientBase::parseAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments)
{
cmd_settings.addProgramOptions(options_description.main_description.value());
/// Parse main commandline options.
auto parser = po::command_line_parser(arguments).options(options_description.main_description.value()).allow_unregistered();
po::parsed_options parsed = parser.run();
/// Check unrecognized options without positional options.
auto unrecognized_options = po::collect_unrecognized(parsed.options, po::collect_unrecognized_mode::exclude_positional);
if (!unrecognized_options.empty())
throw Exception(ErrorCodes::UNRECOGNIZED_ARGUMENTS, "Unrecognized option '{}'", unrecognized_options[0]);
/// Check positional options (options after ' -- ', ex: clickhouse-client -- <options>).
unrecognized_options = po::collect_unrecognized(parsed.options, po::collect_unrecognized_mode::include_positional);
if (unrecognized_options.size() > 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Positional options are not supported.");
po::store(parsed, options);
}
void ClientBase::init(int argc, char ** argv)
{
@ -1559,9 +1612,12 @@ void ClientBase::init(int argc, char ** argv)
("ignore-error", "do not stop processing in multiquery mode")
("stacktrace", "print stack traces of exceptions")
("hardware-utilization", "print hardware utilization information in progress bar")
("print-profile-events", po::value(&profile_events.print)->zero_tokens(), "Printing ProfileEvents packets")
("profile-events-delay-ms", po::value<UInt64>()->default_value(profile_events.delay_ms), "Delay between printing `ProfileEvents` packets (-1 - print only totals, 0 - print every single packet)")
;
addAndCheckOptions(options_description, options, common_arguments);
addOptions(options_description);
parseAndCheckOptions(options_description, options, common_arguments);
po::notify(options);
if (options.count("version") || options.count("V"))
@ -1609,6 +1665,10 @@ void ClientBase::init(int argc, char ** argv)
config().setBool("vertical", true);
if (options.count("stacktrace"))
config().setBool("stacktrace", true);
if (options.count("print-profile-events"))
config().setBool("print-profile-events", true);
if (options.count("profile-events-delay-ms"))
config().setInt("profile-events-delay-ms", options["profile-events-delay-ms"].as<UInt64>());
if (options.count("progress"))
config().setBool("progress", true);
if (options.count("echo"))
@ -1629,6 +1689,8 @@ void ClientBase::init(int argc, char ** argv)
progress_indication.print_hardware_utilization = true;
query_processing_stage = QueryProcessingStage::fromString(options["stage"].as<std::string>());
profile_events.print = options.count("print-profile-events");
profile_events.delay_ms = options["profile-events-delay-ms"].as<UInt64>();
processOptions(options_description, options, external_tables_arguments);
argsToConfig(common_arguments, config(), 100);

View File

@ -3,6 +3,7 @@
#include <Common/ProgressIndication.h>
#include <Common/InterruptListener.h>
#include <Common/ShellCommand.h>
#include <Common/Stopwatch.h>
#include <Core/ExternalTable.h>
#include <Poco/Util/Application.h>
#include <Interpreters/Context.h>
@ -91,7 +92,7 @@ protected:
};
virtual void printHelpMessage(const OptionsDescription & options_description) = 0;
virtual void addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments) = 0;
virtual void addOptions(OptionsDescription & options_description) = 0;
virtual void processOptions(const OptionsDescription & options_description,
const CommandLineOptions & options,
const std::vector<Arguments> & external_tables_arguments) = 0;
@ -132,6 +133,7 @@ private:
void resetOutput();
void outputQueryInfo(bool echo_query_);
void readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector<Arguments> & external_tables_arguments);
void parseAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments);
protected:
bool is_interactive = false; /// Use either interactive line editing interface or batch mode.
@ -218,6 +220,16 @@ protected:
QueryFuzzer fuzzer;
int query_fuzzer_runs = 0;
struct
{
bool print = false;
/// UINT64_MAX -- print only last
UInt64 delay_ms = 0;
Stopwatch watch;
/// For printing only last (delay_ms == 0).
Block last_block;
} profile_events;
QueryProcessingStage::Enum query_processing_stage;
};

View File

@ -109,29 +109,29 @@ void highlight(const String & query, std::vector<replxx::Replxx::Color> & colors
{TokenType::OpeningSquareBracket, Replxx::Color::BROWN},
{TokenType::ClosingSquareBracket, Replxx::Color::BROWN},
{TokenType::DoubleColon, Replxx::Color::BROWN},
{TokenType::OpeningCurlyBrace, Replxx::Color::INTENSE},
{TokenType::ClosingCurlyBrace, Replxx::Color::INTENSE},
{TokenType::OpeningCurlyBrace, replxx::color::bold(Replxx::Color::DEFAULT)},
{TokenType::ClosingCurlyBrace, replxx::color::bold(Replxx::Color::DEFAULT)},
{TokenType::Comma, Replxx::Color::INTENSE},
{TokenType::Semicolon, Replxx::Color::INTENSE},
{TokenType::Dot, Replxx::Color::INTENSE},
{TokenType::Asterisk, Replxx::Color::INTENSE},
{TokenType::Comma, replxx::color::bold(Replxx::Color::DEFAULT)},
{TokenType::Semicolon, replxx::color::bold(Replxx::Color::DEFAULT)},
{TokenType::Dot, replxx::color::bold(Replxx::Color::DEFAULT)},
{TokenType::Asterisk, replxx::color::bold(Replxx::Color::DEFAULT)},
{TokenType::HereDoc, Replxx::Color::CYAN},
{TokenType::Plus, Replxx::Color::INTENSE},
{TokenType::Minus, Replxx::Color::INTENSE},
{TokenType::Slash, Replxx::Color::INTENSE},
{TokenType::Percent, Replxx::Color::INTENSE},
{TokenType::Arrow, Replxx::Color::INTENSE},
{TokenType::QuestionMark, Replxx::Color::INTENSE},
{TokenType::Colon, Replxx::Color::INTENSE},
{TokenType::Equals, Replxx::Color::INTENSE},
{TokenType::NotEquals, Replxx::Color::INTENSE},
{TokenType::Less, Replxx::Color::INTENSE},
{TokenType::Greater, Replxx::Color::INTENSE},
{TokenType::LessOrEquals, Replxx::Color::INTENSE},
{TokenType::GreaterOrEquals, Replxx::Color::INTENSE},
{TokenType::Concatenation, Replxx::Color::INTENSE},
{TokenType::At, Replxx::Color::INTENSE},
{TokenType::Plus, replxx::color::bold(Replxx::Color::DEFAULT)},
{TokenType::Minus, replxx::color::bold(Replxx::Color::DEFAULT)},
{TokenType::Slash, replxx::color::bold(Replxx::Color::DEFAULT)},
{TokenType::Percent, replxx::color::bold(Replxx::Color::DEFAULT)},
{TokenType::Arrow, replxx::color::bold(Replxx::Color::DEFAULT)},
{TokenType::QuestionMark, replxx::color::bold(Replxx::Color::DEFAULT)},
{TokenType::Colon, replxx::color::bold(Replxx::Color::DEFAULT)},
{TokenType::Equals, replxx::color::bold(Replxx::Color::DEFAULT)},
{TokenType::NotEquals, replxx::color::bold(Replxx::Color::DEFAULT)},
{TokenType::Less, replxx::color::bold(Replxx::Color::DEFAULT)},
{TokenType::Greater, replxx::color::bold(Replxx::Color::DEFAULT)},
{TokenType::LessOrEquals, replxx::color::bold(Replxx::Color::DEFAULT)},
{TokenType::GreaterOrEquals, replxx::color::bold(Replxx::Color::DEFAULT)},
{TokenType::Concatenation, replxx::color::bold(Replxx::Color::DEFAULT)},
{TokenType::At, replxx::color::bold(Replxx::Color::DEFAULT)},
{TokenType::DoubleAt, Replxx::Color::MAGENTA},
{TokenType::EndOfStream, Replxx::Color::DEFAULT},
@ -142,7 +142,7 @@ void highlight(const String & query, std::vector<replxx::Replxx::Color> & colors
{TokenType::ErrorDoubleQuoteIsNotClosed, Replxx::Color::RED},
{TokenType::ErrorSinglePipeMark, Replxx::Color::RED},
{TokenType::ErrorWrongNumber, Replxx::Color::RED},
{ TokenType::ErrorMaxQuerySizeExceeded, Replxx::Color::RED }};
{TokenType::ErrorMaxQuerySizeExceeded, Replxx::Color::RED}};
const Replxx::Color unknown_token_color = Replxx::Color::RED;

View File

@ -1,6 +1,7 @@
#include <Client/InternalTextLogs.h>
#include <Core/Block.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/ProfileEventsExt.h>
#include <Common/typeid_cast.h>
#include <Common/HashTable/Hash.h>
#include <DataTypes/IDataType.h>
@ -13,7 +14,7 @@
namespace DB
{
void InternalTextLogs::write(const Block & block)
void InternalTextLogs::writeLogs(const Block & block)
{
const auto & array_event_time = typeid_cast<const ColumnUInt32 &>(*block.getByName("event_time").column).getData();
const auto & array_microseconds = typeid_cast<const ColumnUInt32 &>(*block.getByName("event_time_microseconds").column).getData();
@ -97,4 +98,69 @@ void InternalTextLogs::write(const Block & block)
}
}
void InternalTextLogs::writeProfileEvents(const Block & block)
{
const auto & column_host_name = typeid_cast<const ColumnString &>(*block.getByName("host_name").column);
const auto & array_current_time = typeid_cast<const ColumnUInt32 &>(*block.getByName("current_time").column).getData();
const auto & array_thread_id = typeid_cast<const ColumnUInt64 &>(*block.getByName("thread_id").column).getData();
const auto & array_type = typeid_cast<const ColumnInt8 &>(*block.getByName("type").column).getData();
const auto & column_name = typeid_cast<const ColumnString &>(*block.getByName("name").column);
const auto & array_value = typeid_cast<const ColumnUInt64 &>(*block.getByName("value").column).getData();
for (size_t row_num = 0; row_num < block.rows(); ++row_num)
{
/// host_name
auto host_name = column_host_name.getDataAt(row_num);
if (host_name.size)
{
writeCString("[", wb);
if (color)
writeString(setColor(StringRefHash()(host_name)), wb);
writeString(host_name, wb);
if (color)
writeCString(resetColor(), wb);
writeCString("] ", wb);
}
/// current_time
auto current_time = array_current_time[row_num];
writeDateTimeText<'.', ':'>(current_time, wb);
/// thread_id
UInt64 thread_id = array_thread_id[row_num];
writeCString(" [ ", wb);
if (color)
writeString(setColor(intHash64(thread_id)), wb);
writeIntText(thread_id, wb);
if (color)
writeCString(resetColor(), wb);
writeCString(" ] ", wb);
/// name
auto name = column_name.getDataAt(row_num);
if (color)
writeString(setColor(StringRefHash()(name)), wb);
DB::writeString(name, wb);
if (color)
writeCString(resetColor(), wb);
writeCString(": ", wb);
/// value
UInt64 value = array_value[row_num];
writeIntText(value, wb);
//// type
Int8 type = array_type[row_num];
writeCString(" (", wb);
if (color)
writeString(setColor(intHash64(type)), wb);
writeString(toString(ProfileEvents::TypeEnum->castToName(type)), wb);
if (color)
writeCString(resetColor(), wb);
writeCString(")", wb);
writeChar('\n', wb);
}
}
}

View File

@ -6,16 +6,37 @@
namespace DB
{
/// Prints internal server logs
/// Input blocks have to have the same structure as SystemLogsQueue::getSampleBlock()
/// Prints internal server logs or profile events with colored output (if requested).
/// NOTE: IRowOutputFormat does not suite well for this case
class InternalTextLogs
{
public:
InternalTextLogs(WriteBuffer & buf_out, bool color_) : wb(buf_out), color(color_) {}
void write(const Block & block);
/// Print internal server logs
///
/// Input blocks have to have the same structure as SystemLogsQueue::getSampleBlock():
/// - event_time
/// - event_time_microseconds
/// - host_name
/// - query_id
/// - thread_id
/// - priority
/// - source
/// - text
void writeLogs(const Block & block);
/// Print profile events.
///
/// Block:
/// - host_name
/// - current_time
/// - thread_id
/// - type
/// - name
/// - value
///
/// See also TCPHandler::sendProfileEvents() for block columns.
void writeProfileEvents(const Block & block);
void flush()
{

View File

@ -5,7 +5,7 @@
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Executors/PushingAsyncPipelineExecutor.h>
#include <Storages/IStorage.h>
#include "Core/Protocol.h"
#include <Core/Protocol.h>
namespace DB
@ -105,6 +105,16 @@ void LocalConnection::sendQuery(
state->pushing_executor->start();
state->block = state->pushing_executor->getHeader();
}
const auto & table_id = query_context->getInsertionTable();
if (query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
{
if (!table_id.empty())
{
auto storage_ptr = DatabaseCatalog::instance().getTable(table_id, query_context);
state->columns_description = storage_ptr->getInMemoryMetadataPtr()->getColumns();
}
}
}
else if (state->io.pipeline.pulling())
{
@ -117,7 +127,9 @@ void LocalConnection::sendQuery(
executor.execute();
}
if (state->block)
if (state->columns_description)
next_packet_type = Protocol::Server::TableColumns;
else if (state->block)
next_packet_type = Protocol::Server::Data;
}
catch (const Exception & e)
@ -267,19 +279,19 @@ bool LocalConnection::poll(size_t)
}
}
if (state->is_finished && send_progress && !state->sent_progress)
{
state->sent_progress = true;
next_packet_type = Protocol::Server::Progress;
return true;
}
if (state->is_finished)
{
finishQuery();
return true;
}
if (send_progress && !state->sent_progress)
{
state->sent_progress = true;
next_packet_type = Protocol::Server::Progress;
return true;
}
if (state->block && state->block.value())
{
next_packet_type = Protocol::Server::Data;
@ -293,7 +305,8 @@ bool LocalConnection::pollImpl()
{
Block block;
auto next_read = pullBlock(block);
if (block)
if (block && !state->io.null_format)
{
state->block.emplace(block);
}
@ -337,21 +350,41 @@ Packet LocalConnection::receivePacket()
packet.block = std::move(state->block.value());
state->block.reset();
}
next_packet_type.reset();
break;
}
case Protocol::Server::TableColumns:
{
if (state->columns_description)
{
/// Send external table name (empty name is the main table)
/// (see TCPHandler::sendTableColumns)
packet.multistring_message = {"", state->columns_description->toString()};
}
if (state->block)
{
next_packet_type = Protocol::Server::Data;
}
break;
}
case Protocol::Server::Exception:
{
packet.exception = std::make_unique<Exception>(*state->exception);
next_packet_type.reset();
break;
}
case Protocol::Server::Progress:
{
packet.progress = std::move(state->progress);
state->progress.reset();
next_packet_type.reset();
break;
}
case Protocol::Server::EndOfStream:
{
next_packet_type.reset();
break;
}
default:
@ -359,7 +392,6 @@ Packet LocalConnection::receivePacket()
"Unknown packet {} for {}", toString(packet.type), getDescription());
}
next_packet_type.reset();
return packet;
}

View File

@ -5,6 +5,7 @@
#include <QueryPipeline/BlockIO.h>
#include <IO/TimeoutSetter.h>
#include <Interpreters/Session.h>
#include <Storages/ColumnsDescription.h>
namespace DB
@ -33,6 +34,7 @@ struct LocalQueryState
/// Current block to be sent next.
std::optional<Block> block;
std::optional<ColumnsDescription> columns_description;
/// Is request cancelled
bool is_cancelled = false;

View File

@ -589,6 +589,8 @@
M(619, POSTGRESQL_REPLICATION_INTERNAL_ERROR) \
M(620, QUERY_NOT_ALLOWED) \
M(621, CANNOT_NORMALIZE_STRING) \
M(622, CANNOT_PARSE_CAPN_PROTO_SCHEMA) \
M(623, CAPN_PROTO_BAD_CAST) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -77,3 +77,6 @@ target_link_libraries (average PRIVATE clickhouse_common_io)
add_executable (shell_command_inout shell_command_inout.cpp)
target_link_libraries (shell_command_inout PRIVATE clickhouse_common_io)
add_executable (executable_udf executable_udf.cpp)
target_link_libraries (executable_udf PRIVATE dbms)

View File

@ -0,0 +1,46 @@
#include <vector>
#include <string>
#include <iomanip>
#include <Common/SipHash.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/Stopwatch.h>
using namespace DB;
int main(int argc, char **argv)
{
(void)(argc);
(void)(argv);
std::string buffer;
ReadBufferFromFileDescriptor read_buffer(0);
WriteBufferFromFileDescriptor write_buffer(1);
size_t rows = 0;
char dummy;
while (!read_buffer.eof())
{
readIntText(rows, read_buffer);
readChar(dummy, read_buffer);
for (size_t i = 0; i < rows; ++i)
{
readString(buffer, read_buffer);
readChar(dummy, read_buffer);
writeString("Key ", write_buffer);
writeString(buffer, write_buffer);
writeChar('\n', write_buffer);
}
write_buffer.next();
}
return 0;
}

View File

@ -118,7 +118,7 @@ bool pathStartsWith(const std::filesystem::path & path, const std::filesystem::p
return absolute_path.starts_with(absolute_prefix_path);
}
bool symlinkStartsWith(const std::filesystem::path & path, const std::filesystem::path & prefix_path)
bool fileOrSymlinkPathStartsWith(const std::filesystem::path & path, const std::filesystem::path & prefix_path)
{
/// Differs from pathStartsWith in how `path` is normalized before comparison.
/// Make `path` absolute if it was relative and put it into normalized form: remove
@ -140,13 +140,14 @@ bool pathStartsWith(const String & path, const String & prefix_path)
return pathStartsWith(filesystem_path, filesystem_prefix_path);
}
bool symlinkStartsWith(const String & path, const String & prefix_path)
bool fileOrSymlinkPathStartsWith(const String & path, const String & prefix_path)
{
auto filesystem_path = std::filesystem::path(path);
auto filesystem_prefix_path = std::filesystem::path(prefix_path);
return symlinkStartsWith(filesystem_path, filesystem_prefix_path);
return fileOrSymlinkPathStartsWith(filesystem_path, filesystem_prefix_path);
}
}

View File

@ -35,8 +35,9 @@ bool pathStartsWith(const std::filesystem::path & path, const std::filesystem::p
/// Returns true if path starts with prefix path
bool pathStartsWith(const String & path, const String & prefix_path);
/// Returns true if symlink starts with prefix path
bool symlinkStartsWith(const String & path, const String & prefix_path);
/// Same as pathStartsWith, but without canonization, i.e. allowed to check symlinks.
/// (Path is made absolute and normalized.)
bool fileOrSymlinkPathStartsWith(const String & path, const String & prefix_path);
}

View File

@ -625,7 +625,8 @@ class IColumn;
M(Bool, cross_to_inner_join_rewrite, true, "Use inner join instead of comma/cross join if possible", 0) \
\
M(Bool, output_format_arrow_low_cardinality_as_dictionary, false, "Enable output LowCardinality type as Dictionary Arrow type", 0) \
\
M(EnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::EnumComparingMode::BY_VALUES, "How to map ClickHouse Enum and CapnProto Enum", 0)\
// End of FORMAT_FACTORY_SETTINGS
// Please add settings non-related to formats into the COMMON_SETTINGS above.

View File

@ -116,4 +116,9 @@ IMPLEMENT_SETTING_ENUM(ShortCircuitFunctionEvaluation, ErrorCodes::BAD_ARGUMENTS
{{"enable", ShortCircuitFunctionEvaluation::ENABLE},
{"force_enable", ShortCircuitFunctionEvaluation::FORCE_ENABLE},
{"disable", ShortCircuitFunctionEvaluation::DISABLE}})
IMPLEMENT_SETTING_ENUM(EnumComparingMode, ErrorCodes::BAD_ARGUMENTS,
{{"by_names", FormatSettings::EnumComparingMode::BY_NAMES},
{"by_values", FormatSettings::EnumComparingMode::BY_VALUES},
{"by_names_case_insensitive", FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE}})
}

View File

@ -168,4 +168,6 @@ enum class ShortCircuitFunctionEvaluation
DECLARE_SETTING_ENUM(ShortCircuitFunctionEvaluation)
DECLARE_SETTING_ENUM_WITH_RENAME(EnumComparingMode, FormatSettings::EnumComparingMode)
}

View File

@ -1,4 +1,5 @@
#include <DataTypes/EnumValues.h>
#include <boost/algorithm/string.hpp>
namespace DB
{
@ -82,6 +83,24 @@ Names EnumValues<T>::getAllRegisteredNames() const
return result;
}
template <typename T>
std::unordered_set<String> EnumValues<T>::getSetOfAllNames(bool to_lower) const
{
std::unordered_set<String> result;
for (const auto & value : values)
result.insert(to_lower ? boost::algorithm::to_lower_copy(value.first) : value.first);
return result;
}
template <typename T>
std::unordered_set<T> EnumValues<T>::getSetOfAllValues() const
{
std::unordered_set<T> result;
for (const auto & value : values)
result.insert(value.second);
return result;
}
template class EnumValues<Int8>;
template class EnumValues<Int16>;

View File

@ -80,6 +80,10 @@ public:
}
Names getAllRegisteredNames() const override;
std::unordered_set<String> getSetOfAllNames(bool to_lower) const;
std::unordered_set<T> getSetOfAllValues() const;
};
}

View File

@ -100,7 +100,7 @@ Pipe ExecutablePoolDictionarySource::getStreamForBlock(const Block & block)
config.terminate_in_destructor_strategy = ShellCommand::DestructorStrategy{ true /*terminate_in_destructor*/, configuration.command_termination_timeout };
auto shell_command = ShellCommand::execute(config);
return shell_command;
}, configuration.max_command_execution_time * 1000);
}, configuration.max_command_execution_time * 10000);
if (!result)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED,

View File

@ -31,7 +31,7 @@ FileDictionarySource::FileDictionarySource(
, context(context_)
{
auto user_files_path = context->getUserFilesPath();
if (created_from_ddl && !pathStartsWith(filepath, user_files_path))
if (created_from_ddl && !fileOrSymlinkPathStartsWith(filepath, user_files_path))
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", filepath, user_files_path);
}

View File

@ -41,13 +41,7 @@ LibraryDictionarySource::LibraryDictionarySource(
, context(Context::createCopy(context_))
{
auto dictionaries_lib_path = context->getDictionariesLibPath();
bool path_checked = false;
if (fs::is_symlink(path))
path_checked = symlinkStartsWith(path, dictionaries_lib_path);
else
path_checked = pathStartsWith(path, dictionaries_lib_path);
if (created_from_ddl && !path_checked)
if (created_from_ddl && !fileOrSymlinkPathStartsWith(path, dictionaries_lib_path))
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", path, dictionaries_lib_path);
if (!fs::exists(path))

View File

@ -0,0 +1,432 @@
#include <Formats/CapnProtoUtils.h>
#if USE_CAPNP
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/IDataType.h>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/join.hpp>
#include <capnp/schema.h>
#include <capnp/schema-parser.h>
#include <fcntl.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PARSE_CAPN_PROTO_SCHEMA;
extern const int THERE_IS_NO_COLUMN;
extern const int BAD_TYPE_OF_FIELD;
extern const int CAPN_PROTO_BAD_CAST;
extern const int FILE_DOESNT_EXIST;
extern const int UNKNOWN_EXCEPTION;
extern const int INCORRECT_DATA;
}
capnp::StructSchema CapnProtoSchemaParser::getMessageSchema(const FormatSchemaInfo & schema_info)
{
capnp::ParsedSchema schema;
try
{
int fd;
KJ_SYSCALL(fd = open(schema_info.schemaDirectory().data(), O_RDONLY));
auto schema_dir = kj::newDiskDirectory(kj::OsFileHandle(fd));
schema = impl.parseFromDirectory(*schema_dir, kj::Path::parse(schema_info.schemaPath()), {});
}
catch (const kj::Exception & e)
{
/// That's not good to determine the type of error by its description, but
/// this is the only way to do it here, because kj doesn't specify the type of error.
auto description = std::string_view(e.getDescription().cStr());
if (description.find("No such file or directory") != String::npos || description.find("no such directory") != String::npos)
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Cannot open CapnProto schema, file {} doesn't exists", schema_info.absoluteSchemaPath());
if (description.find("Parse error") != String::npos)
throw Exception(ErrorCodes::CANNOT_PARSE_CAPN_PROTO_SCHEMA, "Cannot parse CapnProto schema {}:{}", schema_info.schemaPath(), e.getLine());
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknown exception while parsing CapnProro schema: {}, schema dir and file: {}, {}", description, schema_info.schemaDirectory(), schema_info.schemaPath());
}
auto message_maybe = schema.findNested(schema_info.messageName());
auto * message_schema = kj::_::readMaybe(message_maybe);
if (!message_schema)
throw Exception(ErrorCodes::CANNOT_PARSE_CAPN_PROTO_SCHEMA, "CapnProto schema doesn't contain message with name {}", schema_info.messageName());
return message_schema->asStruct();
}
bool compareEnumNames(const String & first, const String & second, FormatSettings::EnumComparingMode mode)
{
if (mode == FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE)
return boost::algorithm::to_lower_copy(first) == boost::algorithm::to_lower_copy(second);
return first == second;
}
static const std::map<capnp::schema::Type::Which, String> capnp_simple_type_names =
{
{capnp::schema::Type::Which::BOOL, "Bool"},
{capnp::schema::Type::Which::VOID, "Void"},
{capnp::schema::Type::Which::INT8, "Int8"},
{capnp::schema::Type::Which::INT16, "Int16"},
{capnp::schema::Type::Which::INT32, "Int32"},
{capnp::schema::Type::Which::INT64, "Int64"},
{capnp::schema::Type::Which::UINT8, "UInt8"},
{capnp::schema::Type::Which::UINT16, "UInt16"},
{capnp::schema::Type::Which::UINT32, "UInt32"},
{capnp::schema::Type::Which::UINT64, "UInt64"},
{capnp::schema::Type::Which::FLOAT32, "Float32"},
{capnp::schema::Type::Which::FLOAT64, "Float64"},
{capnp::schema::Type::Which::TEXT, "Text"},
{capnp::schema::Type::Which::DATA, "Data"},
{capnp::schema::Type::Which::INTERFACE, "Interface"},
{capnp::schema::Type::Which::ANY_POINTER, "AnyPointer"},
};
static bool checkIfStructContainsUnnamedUnion(const capnp::StructSchema & struct_schema)
{
return struct_schema.getFields().size() != struct_schema.getNonUnionFields().size();
}
static bool checkIfStructIsNamedUnion(const capnp::StructSchema & struct_schema)
{
return struct_schema.getFields().size() == struct_schema.getUnionFields().size();
}
/// Get full name of type for better exception messages.
static String getCapnProtoFullTypeName(const capnp::Type & type)
{
switch (type.which())
{
case capnp::schema::Type::Which::STRUCT:
{
auto struct_schema = type.asStruct();
auto non_union_fields = struct_schema.getNonUnionFields();
std::vector<String> non_union_field_names;
for (auto nested_field : non_union_fields)
non_union_field_names.push_back(String(nested_field.getProto().getName()) + " " + getCapnProtoFullTypeName(nested_field.getType()));
auto union_fields = struct_schema.getUnionFields();
std::vector<String> union_field_names;
for (auto nested_field : union_fields)
union_field_names.push_back(String(nested_field.getProto().getName()) + " " + getCapnProtoFullTypeName(nested_field.getType()));
String union_name = "Union(" + boost::algorithm::join(union_field_names, ", ") + ")";
/// Check if the struct is a named union.
if (non_union_field_names.empty())
return union_name;
String type_name = "Struct(" + boost::algorithm::join(non_union_field_names, ", ");
/// Check if the struct contains unnamed union.
if (!union_field_names.empty())
type_name += ", " + union_name;
type_name += ")";
return type_name;
}
case capnp::schema::Type::Which::LIST:
return "List(" + getCapnProtoFullTypeName(type.asList().getElementType()) + ")";
case capnp::schema::Type::Which::ENUM:
{
auto enum_schema = type.asEnum();
String enum_name = "Enum(";
auto enumerants = enum_schema.getEnumerants();
for (size_t i = 0; i != enumerants.size(); ++i)
{
enum_name += String(enumerants[i].getProto().getName()) + " = " + std::to_string(enumerants[i].getOrdinal());
if (i + 1 != enumerants.size())
enum_name += ", ";
}
enum_name += ")";
return enum_name;
}
default:
auto it = capnp_simple_type_names.find(type.which());
if (it == capnp_simple_type_names.end())
throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "Unknown CapnProto type");
return it->second;
}
}
template <typename Type>
static bool checkEnums(const capnp::Type & capnp_type, const DataTypePtr column_type, FormatSettings::EnumComparingMode mode, UInt64 max_value, String & error_message)
{
if (!capnp_type.isEnum())
return false;
auto enum_schema = capnp_type.asEnum();
bool to_lower = mode == FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE;
const auto * enum_type = assert_cast<const DataTypeEnum<Type> *>(column_type.get());
const auto & enum_values = dynamic_cast<const EnumValues<Type> &>(*enum_type);
auto enumerants = enum_schema.getEnumerants();
if (mode == FormatSettings::EnumComparingMode::BY_VALUES)
{
/// In CapnProto Enum fields are numbered sequentially starting from zero.
if (enumerants.size() > max_value)
{
error_message += "Enum from CapnProto schema contains values that is out of range for Clickhouse Enum";
return false;
}
auto values = enum_values.getSetOfAllValues();
std::unordered_set<Type> capn_enum_values;
for (auto enumerant : enumerants)
capn_enum_values.insert(Type(enumerant.getOrdinal()));
auto result = values == capn_enum_values;
if (!result)
error_message += "The set of values in Enum from CapnProto schema is different from the set of values in ClickHouse Enum";
return result;
}
auto names = enum_values.getSetOfAllNames(to_lower);
std::unordered_set<String> capn_enum_names;
for (auto enumerant : enumerants)
{
String name = enumerant.getProto().getName();
capn_enum_names.insert(to_lower ? boost::algorithm::to_lower_copy(name) : name);
}
auto result = names == capn_enum_names;
if (!result)
error_message += "The set of names in Enum from CapnProto schema is different from the set of names in ClickHouse Enum";
return result;
}
static bool checkCapnProtoType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message);
static bool checkNullableType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message)
{
if (!capnp_type.isStruct())
return false;
/// Check that struct is a named union of type VOID and one arbitrary type.
auto struct_schema = capnp_type.asStruct();
if (!checkIfStructIsNamedUnion(struct_schema))
return false;
auto union_fields = struct_schema.getUnionFields();
if (union_fields.size() != 2)
return false;
auto first = union_fields[0];
auto second = union_fields[1];
auto nested_type = assert_cast<const DataTypeNullable *>(data_type.get())->getNestedType();
if (first.getType().isVoid())
return checkCapnProtoType(second.getType(), nested_type, mode, error_message);
if (second.getType().isVoid())
return checkCapnProtoType(first.getType(), nested_type, mode, error_message);
return false;
}
static bool checkTupleType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message)
{
if (!capnp_type.isStruct())
return false;
auto struct_schema = capnp_type.asStruct();
if (checkIfStructIsNamedUnion(struct_schema))
return false;
if (checkIfStructContainsUnnamedUnion(struct_schema))
{
error_message += "CapnProto struct contains unnamed union";
return false;
}
const auto * tuple_data_type = assert_cast<const DataTypeTuple *>(data_type.get());
auto nested_types = tuple_data_type->getElements();
if (nested_types.size() != struct_schema.getFields().size())
{
error_message += "Tuple and Struct types have different sizes";
return false;
}
if (!tuple_data_type->haveExplicitNames())
{
error_message += "Only named Tuple can be converted to CapnProto Struct";
return false;
}
for (const auto & name : tuple_data_type->getElementNames())
{
KJ_IF_MAYBE(field, struct_schema.findFieldByName(name))
{
if (!checkCapnProtoType(field->getType(), nested_types[tuple_data_type->getPositionByName(name)], mode, error_message))
return false;
}
else
{
error_message += "CapnProto struct doesn't contain a field with name " + name;
return false;
}
}
return true;
}
static bool checkArrayType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message)
{
if (!capnp_type.isList())
return false;
auto list_schema = capnp_type.asList();
auto nested_type = assert_cast<const DataTypeArray *>(data_type.get())->getNestedType();
return checkCapnProtoType(list_schema.getElementType(), nested_type, mode, error_message);
}
static bool checkCapnProtoType(const capnp::Type & capnp_type, const DataTypePtr & data_type, FormatSettings::EnumComparingMode mode, String & error_message)
{
switch (data_type->getTypeId())
{
case TypeIndex::UInt8:
return capnp_type.isBool() || capnp_type.isUInt8();
case TypeIndex::Date: [[fallthrough]];
case TypeIndex::UInt16:
return capnp_type.isUInt16();
case TypeIndex::DateTime: [[fallthrough]];
case TypeIndex::UInt32:
return capnp_type.isUInt32();
case TypeIndex::UInt64:
return capnp_type.isUInt64();
case TypeIndex::Int8:
return capnp_type.isInt8();
case TypeIndex::Int16:
return capnp_type.isInt16();
case TypeIndex::Date32: [[fallthrough]];
case TypeIndex::Int32:
return capnp_type.isInt32();
case TypeIndex::DateTime64: [[fallthrough]];
case TypeIndex::Int64:
return capnp_type.isInt64();
case TypeIndex::Float32:
return capnp_type.isFloat32();
case TypeIndex::Float64:
return capnp_type.isFloat64();
case TypeIndex::Enum8:
return checkEnums<Int8>(capnp_type, data_type, mode, INT8_MAX, error_message);
case TypeIndex::Enum16:
return checkEnums<Int16>(capnp_type, data_type, mode, INT16_MAX, error_message);
case TypeIndex::Tuple:
return checkTupleType(capnp_type, data_type, mode, error_message);
case TypeIndex::Nullable:
{
auto result = checkNullableType(capnp_type, data_type, mode, error_message);
if (!result)
error_message += "Nullable can be represented only as a named union of type Void and nested type";
return result;
}
case TypeIndex::Array:
return checkArrayType(capnp_type, data_type, mode, error_message);
case TypeIndex::LowCardinality:
return checkCapnProtoType(capnp_type, assert_cast<const DataTypeLowCardinality *>(data_type.get())->getDictionaryType(), mode, error_message);
case TypeIndex::FixedString: [[fallthrough]];
case TypeIndex::String:
return capnp_type.isText() || capnp_type.isData();
default:
return false;
}
}
static std::pair<String, String> splitFieldName(const String & name)
{
const auto * begin = name.data();
const auto * end = name.data() + name.size();
const auto * it = find_first_symbols<'_', '.'>(begin, end);
String first = String(begin, it);
String second = it == end ? "" : String(it + 1, end);
return {first, second};
}
capnp::DynamicValue::Reader getReaderByColumnName(const capnp::DynamicStruct::Reader & struct_reader, const String & name)
{
auto [field_name, nested_name] = splitFieldName(name);
KJ_IF_MAYBE(field, struct_reader.getSchema().findFieldByName(field_name))
{
capnp::DynamicValue::Reader field_reader;
try
{
field_reader = struct_reader.get(*field);
}
catch (const kj::Exception & e)
{
throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot extract field value from struct by provided schema, error: {} Perhaps the data was generated by another schema", String(e.getDescription().cStr()));
}
if (nested_name.empty())
return field_reader;
if (field_reader.getType() != capnp::DynamicValue::STRUCT)
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Field {} is not a struct", field_name);
return getReaderByColumnName(field_reader.as<capnp::DynamicStruct>(), nested_name);
}
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Capnproto struct doesn't contain field with name {}", field_name);
}
std::pair<capnp::DynamicStruct::Builder, capnp::StructSchema::Field> getStructBuilderAndFieldByColumnName(capnp::DynamicStruct::Builder struct_builder, const String & name)
{
auto [field_name, nested_name] = splitFieldName(name);
KJ_IF_MAYBE(field, struct_builder.getSchema().findFieldByName(field_name))
{
if (nested_name.empty())
return {struct_builder, *field};
auto field_builder = struct_builder.get(*field);
if (field_builder.getType() != capnp::DynamicValue::STRUCT)
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Field {} is not a struct", field_name);
return getStructBuilderAndFieldByColumnName(field_builder.as<capnp::DynamicStruct>(), nested_name);
}
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Capnproto struct doesn't contain field with name {}", field_name);
}
static capnp::StructSchema::Field getFieldByName(const capnp::StructSchema & schema, const String & name)
{
auto [field_name, nested_name] = splitFieldName(name);
KJ_IF_MAYBE(field, schema.findFieldByName(field_name))
{
if (nested_name.empty())
return *field;
if (!field->getType().isStruct())
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Field {} is not a struct", field_name);
return getFieldByName(field->getType().asStruct(), nested_name);
}
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Capnproto schema doesn't contain field with name {}", field_name);
}
void checkCapnProtoSchemaStructure(const capnp::StructSchema & schema, const Block & header, FormatSettings::EnumComparingMode mode)
{
/// Firstly check that struct doesn't contain unnamed union, because we don't support it.
if (checkIfStructContainsUnnamedUnion(schema))
throw Exception(ErrorCodes::CAPN_PROTO_BAD_CAST, "Schema contains unnamed union that is not supported");
auto names_and_types = header.getNamesAndTypesList();
String additional_error_message;
for (auto & [name, type] : names_and_types)
{
auto field = getFieldByName(schema, name);
if (!checkCapnProtoType(field.getType(), type, mode, additional_error_message))
{
auto e = Exception(
ErrorCodes::CAPN_PROTO_BAD_CAST,
"Cannot convert ClickHouse type {} to CapnProto type {}",
type->getName(),
getCapnProtoFullTypeName(field.getType()));
if (!additional_error_message.empty())
e.addMessage(additional_error_message);
throw std::move(e);
}
}
}
}
#endif

View File

@ -0,0 +1,43 @@
#pragma once
#include "config_formats.h"
#if USE_CAPNP
#include <Formats/FormatSchemaInfo.h>
#include <Formats/FormatSettings.h>
#include <Core/Block.h>
#include <capnp/schema-parser.h>
#include <capnp/dynamic.h>
namespace DB
{
// Wrapper for classes that could throw in destructor
// https://github.com/capnproto/capnproto/issues/553
template <typename T>
struct DestructorCatcher
{
T impl;
template <typename ... Arg>
DestructorCatcher(Arg && ... args) : impl(kj::fwd<Arg>(args)...) {}
~DestructorCatcher() noexcept try { } catch (...) { return; }
};
class CapnProtoSchemaParser : public DestructorCatcher<capnp::SchemaParser>
{
public:
CapnProtoSchemaParser() {}
capnp::StructSchema getMessageSchema(const FormatSchemaInfo & schema_info);
};
bool compareEnumNames(const String & first, const String & second, FormatSettings::EnumComparingMode mode);
std::pair<capnp::DynamicStruct::Builder, capnp::StructSchema::Field> getStructBuilderAndFieldByColumnName(capnp::DynamicStruct::Builder struct_builder, const String & name);
capnp::DynamicValue::Reader getReaderByColumnName(const capnp::DynamicStruct::Reader & struct_reader, const String & name);
void checkCapnProtoSchemaStructure(const capnp::StructSchema & schema, const Block & header, FormatSettings::EnumComparingMode mode);
}
#endif

View File

@ -111,6 +111,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.arrow.low_cardinality_as_dictionary = settings.output_format_arrow_low_cardinality_as_dictionary;
format_settings.arrow.import_nested = settings.input_format_arrow_import_nested;
format_settings.orc.import_nested = settings.input_format_orc_import_nested;
format_settings.capn_proto.enum_comparing_mode = settings.format_capn_proto_enum_comparising_mode;
/// Validate avro_schema_registry_url with RemoteHostFilter when non-empty and in Server context
if (format_settings.schema.is_server)

View File

@ -42,7 +42,7 @@ FormatSettings getFormatSettings(ContextPtr context);
template <typename T>
FormatSettings getFormatSettings(ContextPtr context, const T & settings);
/** Allows to create an IBlockInputStream or IBlockOutputStream by the name of the format.
/** Allows to create an IInputFormat or IOutputFormat by the name of the format.
* Note: format and compression are independent things.
*/
class FormatFactory final : private boost::noncopyable

View File

@ -99,4 +99,10 @@ FormatSchemaInfo::FormatSchemaInfo(const String & format_schema, const String &
}
}
FormatSchemaInfo::FormatSchemaInfo(const FormatSettings & settings, const String & format, bool require_message)
: FormatSchemaInfo(
settings.schema.format_schema, format, require_message, settings.schema.is_server, settings.schema.format_schema_path)
{
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <base/types.h>
#include <Formats/FormatSettings.h>
namespace DB
{
@ -11,6 +12,7 @@ class FormatSchemaInfo
{
public:
FormatSchemaInfo(const String & format_schema, const String & format, bool require_message, bool is_server, const std::string & format_schema_path);
FormatSchemaInfo(const FormatSettings & settings, const String & format, bool require_message);
/// Returns path to the schema file.
const String & schemaPath() const { return schema_path; }

View File

@ -183,6 +183,20 @@ struct FormatSettings
{
bool import_nested = false;
} orc;
/// For capnProto format we should determine how to
/// compare ClickHouse Enum and Enum from schema.
enum class EnumComparingMode
{
BY_NAMES, // Names in enums should be the same, values can be different.
BY_NAMES_CASE_INSENSITIVE, // Case-insensitive name comparison.
BY_VALUES, // Values should be the same, names can be different.
};
struct
{
EnumComparingMode enum_comparing_mode = EnumComparingMode::BY_VALUES;
} capn_proto;
};
}

View File

@ -56,7 +56,6 @@ NativeReader::NativeReader(ReadBuffer & istr_, UInt64 server_revision_,
}
}
// also resets few vars from IBlockInputStream (I didn't want to propagate resetParser upthere)
void NativeReader::resetParser()
{
istr_concrete = nullptr;

View File

@ -67,6 +67,7 @@ void registerOutputFormatNull(FormatFactory & factory);
void registerOutputFormatMySQLWire(FormatFactory & factory);
void registerOutputFormatMarkdown(FormatFactory & factory);
void registerOutputFormatPostgreSQLWire(FormatFactory & factory);
void registerOutputFormatCapnProto(FormatFactory & factory);
/// Input only formats.
@ -139,6 +140,7 @@ void registerFormats()
registerOutputFormatMySQLWire(factory);
registerOutputFormatMarkdown(factory);
registerOutputFormatPostgreSQLWire(factory);
registerOutputFormatCapnProto(factory);
registerInputFormatRegexp(factory);
registerInputFormatJSONAsString(factory);

View File

@ -96,6 +96,9 @@ struct ReplaceRegexpImpl
re2_st::StringPiece matches[max_captures];
size_t start_pos = 0;
bool is_first_match = true;
bool is_start_pos_added_one = false;
while (start_pos < static_cast<size_t>(input.length()))
{
/// If no more replacements possible for current string
@ -103,6 +106,9 @@ struct ReplaceRegexpImpl
if (searcher.Match(input, start_pos, input.length(), re2_st::RE2::Anchor::UNANCHORED, matches, num_captures))
{
if (is_start_pos_added_one)
start_pos -= 1;
const auto & match = matches[0];
size_t bytes_to_copy = (match.data() - input.data()) - start_pos;
@ -112,6 +118,13 @@ struct ReplaceRegexpImpl
res_offset += bytes_to_copy;
start_pos += bytes_to_copy + match.length();
/// To avoid infinite loop.
if (is_first_match && match.length() == 0 && !replace_one && input.length() > 1)
{
start_pos += 1;
is_start_pos_added_one = true;
}
/// Do substitution instructions
for (const auto & it : instructions)
{
@ -129,8 +142,9 @@ struct ReplaceRegexpImpl
}
}
if (replace_one || match.length() == 0) /// Stop after match of zero length, to avoid infinite loop.
if (replace_one || (!is_first_match && match.length() == 0))
can_finish_current_string = true;
is_first_match = false;
}
else
can_finish_current_string = true;

View File

@ -40,6 +40,7 @@ public:
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }
bool useDefaultImplementationForNulls() const override { return false; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {0}; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override;

View File

@ -18,10 +18,10 @@ namespace ErrorCodes
template <class DataTypeName, class Geometry, class Serializer, class NameHolder>
class FunctionReadWkt : public IFunction
class FunctionReadWKT : public IFunction
{
public:
explicit FunctionReadWkt() = default;
explicit FunctionReadWKT() = default;
static constexpr const char * name = NameHolder::name;
@ -72,36 +72,36 @@ public:
static FunctionPtr create(ContextPtr)
{
return std::make_shared<FunctionReadWkt<DataTypeName, Geometry, Serializer, NameHolder>>();
return std::make_shared<FunctionReadWKT<DataTypeName, Geometry, Serializer, NameHolder>>();
}
};
struct ReadWktPointNameHolder
struct ReadWKTPointNameHolder
{
static constexpr const char * name = "readWktPoint";
static constexpr const char * name = "readWKTPoint";
};
struct ReadWktRingNameHolder
struct ReadWKTRingNameHolder
{
static constexpr const char * name = "readWktRing";
static constexpr const char * name = "readWKTRing";
};
struct ReadWktPolygonNameHolder
struct ReadWKTPolygonNameHolder
{
static constexpr const char * name = "readWktPolygon";
static constexpr const char * name = "readWKTPolygon";
};
struct ReadWktMultiPolygonNameHolder
struct ReadWKTMultiPolygonNameHolder
{
static constexpr const char * name = "readWktMultiPolygon";
static constexpr const char * name = "readWKTMultiPolygon";
};
void registerFunctionReadWkt(FunctionFactory & factory)
void registerFunctionReadWKT(FunctionFactory & factory)
{
factory.registerFunction<FunctionReadWkt<DataTypePointName, CartesianPoint, PointSerializer<CartesianPoint>, ReadWktPointNameHolder>>();
factory.registerFunction<FunctionReadWkt<DataTypeRingName, CartesianRing, RingSerializer<CartesianPoint>, ReadWktRingNameHolder>>();
factory.registerFunction<FunctionReadWkt<DataTypePolygonName, CartesianPolygon, PolygonSerializer<CartesianPoint>, ReadWktPolygonNameHolder>>();
factory.registerFunction<FunctionReadWkt<DataTypeMultiPolygonName, CartesianMultiPolygon, MultiPolygonSerializer<CartesianPoint>, ReadWktMultiPolygonNameHolder>>();
factory.registerFunction<FunctionReadWKT<DataTypePointName, CartesianPoint, PointSerializer<CartesianPoint>, ReadWKTPointNameHolder>>();
factory.registerFunction<FunctionReadWKT<DataTypeRingName, CartesianRing, RingSerializer<CartesianPoint>, ReadWKTRingNameHolder>>();
factory.registerFunction<FunctionReadWKT<DataTypePolygonName, CartesianPolygon, PolygonSerializer<CartesianPoint>, ReadWKTPolygonNameHolder>>();
factory.registerFunction<FunctionReadWKT<DataTypeMultiPolygonName, CartesianMultiPolygon, MultiPolygonSerializer<CartesianPoint>, ReadWKTMultiPolygonNameHolder>>();
}
}

View File

@ -23,7 +23,7 @@ void registerFunctionGeohashEncode(FunctionFactory & factory);
void registerFunctionGeohashDecode(FunctionFactory & factory);
void registerFunctionGeohashesInBox(FunctionFactory & factory);
void registerFunctionWkt(FunctionFactory & factory);
void registerFunctionReadWkt(FunctionFactory & factory);
void registerFunctionReadWKT(FunctionFactory & factory);
void registerFunctionSvg(FunctionFactory & factory);
#if USE_H3
@ -79,7 +79,7 @@ void registerFunctionsGeo(FunctionFactory & factory)
registerFunctionGeohashDecode(factory);
registerFunctionGeohashesInBox(factory);
registerFunctionWkt(factory);
registerFunctionReadWkt(factory);
registerFunctionReadWKT(factory);
registerFunctionSvg(factory);
#if USE_H3

View File

@ -102,6 +102,7 @@ public:
void registerFunctionSvg(FunctionFactory & factory)
{
factory.registerFunction<FunctionSvg>();
factory.registerAlias("SVG", "svg");
}
}

View File

@ -121,7 +121,7 @@ struct Progress
/** Callback to track the progress of the query.
* Used in IBlockInputStream and Context.
* Used in QueryPipeline and Context.
* The function takes the number of rows in the last block, the number of bytes in the last block.
* Note that the callback can be called from different threads.
*/

View File

@ -279,29 +279,39 @@ ReturnType readIntTextImpl(T & x, ReadBuffer & buf)
{
case '+':
{
if (has_sign || has_number)
/// 123+ or +123+, just stop after 123 or +123.
if (has_number)
goto end;
/// No digits read yet, but we already read sign, like ++, -+.
if (has_sign)
{
if constexpr (throw_exception)
throw ParsingException(
"Cannot parse number with multiple sign (+/-) characters or intermediate sign character",
"Cannot parse number with multiple sign (+/-) characters",
ErrorCodes::CANNOT_PARSE_NUMBER);
else
return ReturnType(false);
}
has_sign = true;
break;
}
case '-':
{
if (has_sign || has_number)
if (has_number)
goto end;
if (has_sign)
{
if constexpr (throw_exception)
throw ParsingException(
"Cannot parse number with multiple sign (+/-) characters or intermediate sign character",
"Cannot parse number with multiple sign (+/-) characters",
ErrorCodes::CANNOT_PARSE_NUMBER);
else
return ReturnType(false);
}
if constexpr (is_signed_v<T>)
negative = true;
else

View File

@ -43,8 +43,6 @@ namespace ErrorCodes
extern const int UNKNOWN_AGGREGATED_DATA_VARIANT;
}
class IBlockOutputStream;
/** Different data structures that can be used for aggregation
* For efficiency, the aggregation data itself is put into the pool.
* Data and pool ownership (states of aggregate functions)

View File

@ -1895,7 +1895,7 @@ void Context::setSystemZooKeeperLogAfterInitializationIfNeeded()
zk.second->setZooKeeperLog(shared->system_logs->zookeeper_log);
}
void Context::initializeKeeperDispatcher(bool start_async) const
void Context::initializeKeeperDispatcher([[maybe_unused]] bool start_async) const
{
#if USE_NURAFT
std::lock_guard lock(shared->keeper_storage_dispatcher_mutex);
@ -2971,8 +2971,12 @@ void Context::setAsynchronousInsertQueue(const std::shared_ptr<AsynchronousInser
shared->async_insert_queue = ptr;
}
void Context::initializeBackgroundExecutors()
void Context::initializeBackgroundExecutorsIfNeeded()
{
auto lock = getLock();
if (is_background_executors_initialized)
return;
const size_t max_merges_and_mutations = getSettingsRef().background_pool_size * getSettingsRef().background_merges_mutations_concurrency_ratio;
/// With this executor we can execute more tasks than threads we have
@ -3019,6 +3023,8 @@ void Context::initializeBackgroundExecutors()
LOG_INFO(shared->log, "Initialized background executor for common operations (e.g. clearing old parts) with num_threads={}, num_tasks={}",
getSettingsRef().background_common_pool_size, getSettingsRef().background_common_pool_size);
is_background_executors_initialized = true;
}

View File

@ -293,6 +293,8 @@ private:
/// A flag, used to distinguish between user query and internal query to a database engine (MaterializedPostgreSQL).
bool is_internal_query = false;
/// Has initializeBackgroundExecutors() method been executed?
bool is_background_executors_initialized = false;
public:
@ -636,13 +638,13 @@ public:
const Settings & getSettingsRef() const { return settings; }
void setProgressCallback(ProgressCallback callback);
/// Used in InterpreterSelectQuery to pass it to the IBlockInputStream.
/// Used in executeQuery() to pass it to the QueryPipeline.
ProgressCallback getProgressCallback() const;
void setFileProgressCallback(FileProgressCallback && callback) { file_progress_callback = callback; }
FileProgressCallback getFileProgressCallback() const { return file_progress_callback; }
/** Set in executeQuery and InterpreterSelectQuery. Then it is used in IBlockInputStream,
/** Set in executeQuery and InterpreterSelectQuery. Then it is used in QueryPipeline,
* to update and monitor information about the total number of resources spent for the query.
*/
void setProcessListElement(QueryStatus * elem);
@ -867,7 +869,7 @@ public:
void setReadTaskCallback(ReadTaskCallback && callback);
/// Background executors related methods
void initializeBackgroundExecutors();
void initializeBackgroundExecutorsIfNeeded();
MergeMutateBackgroundExecutorPtr getMergeMutateExecutor() const;
OrdinaryBackgroundExecutorPtr getMovesExecutor() const;

View File

@ -1,21 +1,23 @@
#include <Interpreters/InterpreterCreateFunctionQuery.h>
#include <stack>
#include <Access/ContextAccess.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterCreateFunctionQuery.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Interpreters/UserDefinedSQLObjectsLoader.h>
#include <Interpreters/UserDefinedSQLFunctionFactory.h>
#include <stack>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_IDENTIFIER;
extern const int CANNOT_CREATE_RECURSIVE_FUNCTION;
extern const int UNSUPPORTED_METHOD;
}
@ -31,20 +33,32 @@ BlockIO InterpreterCreateFunctionQuery::execute()
if (!create_function_query)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Expected CREATE FUNCTION query");
auto & user_defined_function_factory = UserDefinedSQLFunctionFactory::instance();
auto & function_name = create_function_query->function_name;
bool if_not_exists = create_function_query->if_not_exists;
bool replace = create_function_query->or_replace;
create_function_query->if_not_exists = false;
create_function_query->or_replace = false;
if (if_not_exists && user_defined_function_factory.tryGet(function_name) != nullptr)
return {};
validateFunction(create_function_query->function_core, function_name);
UserDefinedSQLFunctionFactory::instance().registerFunction(function_name, query_ptr);
user_defined_function_factory.registerFunction(function_name, query_ptr, replace);
if (!persist_function)
if (persist_function)
{
try
{
UserDefinedSQLObjectsLoader::instance().storeObject(current_context, UserDefinedSQLObjectType::Function, function_name, *query_ptr);
UserDefinedSQLObjectsLoader::instance().storeObject(current_context, UserDefinedSQLObjectType::Function, function_name, *query_ptr, replace);
}
catch (Exception & exception)
{
UserDefinedSQLFunctionFactory::instance().unregisterFunction(function_name);
user_defined_function_factory.unregisterFunction(function_name);
exception.addMessage(fmt::format("while storing user defined function {} on disk", backQuote(function_name)));
throw;
}
@ -66,42 +80,9 @@ void InterpreterCreateFunctionQuery::validateFunction(ASTPtr function, const Str
}
ASTPtr function_body = function->as<ASTFunction>()->children.at(0)->children.at(1);
std::unordered_set<String> identifiers_in_body = getIdentifiers(function_body);
for (const auto & identifier : identifiers_in_body)
{
if (!arguments.contains(identifier))
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER, "Identifier {} does not exist in arguments", backQuote(identifier));
}
validateFunctionRecursiveness(function_body, name);
}
std::unordered_set<String> InterpreterCreateFunctionQuery::getIdentifiers(ASTPtr node)
{
std::unordered_set<String> identifiers;
std::stack<ASTPtr> ast_nodes_to_process;
ast_nodes_to_process.push(node);
while (!ast_nodes_to_process.empty())
{
auto ast_node_to_process = ast_nodes_to_process.top();
ast_nodes_to_process.pop();
for (const auto & child : ast_node_to_process->children)
{
auto identifier_name_opt = tryGetIdentifierName(child);
if (identifier_name_opt)
identifiers.insert(identifier_name_opt.value());
ast_nodes_to_process.push(child);
}
}
return identifiers;
}
void InterpreterCreateFunctionQuery::validateFunctionRecursiveness(ASTPtr node, const String & function_to_create)
{
for (const auto & child : node->children)

View File

@ -22,7 +22,6 @@ public:
private:
static void validateFunction(ASTPtr function, const String & name);
static std::unordered_set<String> getIdentifiers(ASTPtr node);
static void validateFunctionRecursiveness(ASTPtr node, const String & function_to_create);
ASTPtr query_ptr;

View File

@ -18,6 +18,11 @@ BlockIO InterpreterDropFunctionQuery::execute()
FunctionNameNormalizer().visit(query_ptr.get());
auto & drop_function_query = query_ptr->as<ASTDropFunctionQuery &>();
auto & user_defined_functions_factory = UserDefinedSQLFunctionFactory::instance();
if (drop_function_query.if_exists && !user_defined_functions_factory.has(drop_function_query.function_name))
return {};
UserDefinedSQLFunctionFactory::instance().unregisterFunction(drop_function_query.function_name);
UserDefinedSQLObjectsLoader::instance().removeObject(current_context, UserDefinedSQLObjectType::Function, drop_function_query.function_name);

View File

@ -278,7 +278,7 @@ std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, ContextMut
}
else if (query->as<ASTCreateFunctionQuery>())
{
return std::make_unique<InterpreterCreateFunctionQuery>(query, context, false /*is_internal*/);
return std::make_unique<InterpreterCreateFunctionQuery>(query, context, true /*persist_function*/);
}
else if (query->as<ASTDropFunctionQuery>())
{

View File

@ -11,6 +11,11 @@
namespace ProfileEvents
{
std::shared_ptr<DB::DataTypeEnum8> TypeEnum = std::make_shared<DB::DataTypeEnum8>(DB::DataTypeEnum8::Values{
{ "increment", static_cast<Int8>(INCREMENT)},
{ "gauge", static_cast<Int8>(GAUGE)},
});
/// Put implementation here to avoid extra linking dependencies for clickhouse_common_io
void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, bool nonzero_only)
{

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/ProfileEvents.h>
#include <DataTypes/DataTypeEnum.h>
#include <Columns/IColumn.h>
@ -9,4 +10,13 @@ namespace ProfileEvents
/// Dumps profile events to columns Map(String, UInt64)
void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, bool nonzero_only = true);
/// This is for ProfileEvents packets.
enum Type : int8_t
{
INCREMENT = 1,
GAUGE = 2,
};
extern std::shared_ptr<DB::DataTypeEnum8> TypeEnum;
}

View File

@ -19,7 +19,7 @@ UserDefinedSQLFunctionFactory & UserDefinedSQLFunctionFactory::instance()
return result;
}
void UserDefinedSQLFunctionFactory::registerFunction(const String & function_name, ASTPtr create_function_query)
void UserDefinedSQLFunctionFactory::registerFunction(const String & function_name, ASTPtr create_function_query, bool replace)
{
if (FunctionFactory::instance().hasNameOrAlias(function_name))
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "The function '{}' already exists", function_name);
@ -29,11 +29,17 @@ void UserDefinedSQLFunctionFactory::registerFunction(const String & function_nam
std::lock_guard lock(mutex);
auto [_, inserted] = function_name_to_create_query.emplace(function_name, std::move(create_function_query));
auto [it, inserted] = function_name_to_create_query.emplace(function_name, create_function_query);
if (!inserted)
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS,
"The function name '{}' is not unique",
function_name);
{
if (replace)
it->second = std::move(create_function_query);
else
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS,
"The function name '{}' is not unique",
function_name);
}
}
void UserDefinedSQLFunctionFactory::unregisterFunction(const String & function_name)
@ -77,6 +83,11 @@ ASTPtr UserDefinedSQLFunctionFactory::tryGet(const std::string & function_name)
return it->second;
}
bool UserDefinedSQLFunctionFactory::has(const String & function_name) const
{
return tryGet(function_name) != nullptr;
}
std::vector<std::string> UserDefinedSQLFunctionFactory::getAllRegisteredNames() const
{
std::vector<std::string> registered_names;

View File

@ -10,19 +10,31 @@
namespace DB
{
/// Factory for SQLUserDefinedFunctions
class UserDefinedSQLFunctionFactory : public IHints<1, UserDefinedSQLFunctionFactory>
{
public:
static UserDefinedSQLFunctionFactory & instance();
void registerFunction(const String & function_name, ASTPtr create_function_query);
/** Register function for function_name in factory for specified create_function_query.
* If replace = true and function with function_name already exists replace it with create_function_query.
* Otherwise throws exception.
*/
void registerFunction(const String & function_name, ASTPtr create_function_query, bool replace);
/// Unregister function for function_name
void unregisterFunction(const String & function_name);
/// Get function create query for function_name. If no function registered with function_name throws exception.
ASTPtr get(const String & function_name) const;
/// Get function create query for function_name. If no function registered with function_name return nullptr.
ASTPtr tryGet(const String & function_name) const;
/// Check if function with function_name registered.
bool has(const String & function_name) const;
/// Get all user defined functions registered names.
std::vector<String> getAllRegisteredNames() const override;
private:

View File

@ -25,6 +25,7 @@ void UserDefinedSQLFunctionMatcher::visit(ASTPtr & ast, Data &)
return;
auto result = tryToReplaceFunction(*function);
if (result)
ast = result;
}
@ -83,9 +84,16 @@ ASTPtr UserDefinedSQLFunctionMatcher::tryToReplaceFunction(const ASTFunction & f
if (identifier_name_opt)
{
auto function_argument_it = identifier_name_to_function_argument.find(*identifier_name_opt);
assert(function_argument_it != identifier_name_to_function_argument.end());
if (function_argument_it == identifier_name_to_function_argument.end())
continue;
auto child_alias = child->tryGetAlias();
child = function_argument_it->second->clone();
if (!child_alias.empty())
child->setAlias(child_alias);
continue;
}

View File

@ -69,7 +69,7 @@ void UserDefinedSQLObjectsLoader::loadUserDefinedObject(ContextPtr context, User
0,
context->getSettingsRef().max_parser_depth);
InterpreterCreateFunctionQuery interpreter(ast, context, true /*is internal*/);
InterpreterCreateFunctionQuery interpreter(ast, context, false /*persist_function*/);
interpreter.execute();
}
}
@ -111,7 +111,7 @@ void UserDefinedSQLObjectsLoader::loadObjects(ContextPtr context)
}
}
void UserDefinedSQLObjectsLoader::storeObject(ContextPtr context, UserDefinedSQLObjectType object_type, const String & object_name, const IAST & ast)
void UserDefinedSQLObjectsLoader::storeObject(ContextPtr context, UserDefinedSQLObjectType object_type, const String & object_name, const IAST & ast, bool replace)
{
if (unlikely(!enable_persistence))
return;
@ -127,7 +127,7 @@ void UserDefinedSQLObjectsLoader::storeObject(ContextPtr context, UserDefinedSQL
}
}
if (std::filesystem::exists(file_path))
if (!replace && std::filesystem::exists(file_path))
throw Exception(ErrorCodes::OBJECT_ALREADY_STORED_ON_DISK, "User defined object {} already stored on disk", backQuote(file_path));
LOG_DEBUG(log, "Storing object {} to file {}", backQuote(object_name), file_path);
@ -135,9 +135,9 @@ void UserDefinedSQLObjectsLoader::storeObject(ContextPtr context, UserDefinedSQL
WriteBufferFromOwnString create_statement_buf;
formatAST(ast, create_statement_buf, false);
writeChar('\n', create_statement_buf);
String create_statement = create_statement_buf.str();
WriteBufferFromFile out(file_path, create_statement.size(), O_WRONLY | O_CREAT | O_EXCL);
WriteBufferFromFile out(file_path, create_statement.size());
writeString(create_statement, out);
out.next();
if (context->getSettingsRef().fsync_metadata)

View File

@ -21,7 +21,7 @@ public:
UserDefinedSQLObjectsLoader();
void loadObjects(ContextPtr context);
void storeObject(ContextPtr context, UserDefinedSQLObjectType object_type, const String & object_name, const IAST & ast);
void storeObject(ContextPtr context, UserDefinedSQLObjectType object_type, const String & object_name, const IAST & ast, bool replace);
void removeObject(ContextPtr context, UserDefinedSQLObjectType object_type, const String & object_name);
/// For ClickHouse local if path is not set we can disable loader.

View File

@ -161,7 +161,7 @@ void loadMetadata(ContextMutablePtr context, const String & default_database_nam
bool create_default_db_if_not_exists = !default_database_name.empty();
bool metadata_dir_for_default_db_already_exists = databases.count(default_database_name);
if (create_default_db_if_not_exists && !metadata_dir_for_default_db_already_exists)
databases.emplace(default_database_name, path + "/" + escapeForFileName(default_database_name));
databases.emplace(default_database_name, std::filesystem::path(path) / escapeForFileName(default_database_name));
TablesLoader::Databases loaded_databases;
for (const auto & [name, db_path] : databases)

View File

@ -12,7 +12,18 @@ ASTPtr ASTCreateFunctionQuery::clone() const
void ASTCreateFunctionQuery::formatImpl(const IAST::FormatSettings & settings, IAST::FormatState & state, IAST::FormatStateStacked frame) const
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "CREATE FUNCTION " << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_keyword : "") << "CREATE ";
if (or_replace)
settings.ostr << "OR REPLACE ";
settings.ostr << "FUNCTION ";
if (if_not_exists)
settings.ostr << "IF NOT EXISTS ";
settings.ostr << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(function_name) << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_keyword : "") << " AS " << (settings.hilite ? hilite_none : "");
function_core->formatImpl(settings, state, frame);

View File

@ -12,6 +12,9 @@ public:
String function_name;
ASTPtr function_core;
bool or_replace = false;
bool if_not_exists = false;
String getID(char) const override { return "CreateFunctionQuery"; }
ASTPtr clone() const override;

View File

@ -12,7 +12,12 @@ ASTPtr ASTDropFunctionQuery::clone() const
void ASTDropFunctionQuery::formatImpl(const IAST::FormatSettings & settings, IAST::FormatState &, IAST::FormatStateStacked) const
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "DROP FUNCTION " << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_keyword : "") << "DROP FUNCTION ";
if (if_exists)
settings.ostr << "IF EXISTS ";
settings.ostr << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(function_name) << (settings.hilite ? hilite_none : "");
}

View File

@ -10,6 +10,8 @@ class ASTDropFunctionQuery : public IAST
public:
String function_name;
bool if_exists = false;
String getID(char) const override { return "DropFunctionQuery"; }
ASTPtr clone() const override;

View File

@ -1,10 +1,12 @@
#include <Parsers/ParserCreateFunctionQuery.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/ParserCreateFunctionQuery.h>
namespace DB
{
@ -13,6 +15,8 @@ bool ParserCreateFunctionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Exp
{
ParserKeyword s_create("CREATE");
ParserKeyword s_function("FUNCTION");
ParserKeyword s_or_replace("OR REPLACE");
ParserKeyword s_if_not_exists("IF NOT EXISTS");
ParserIdentifier function_name_p;
ParserKeyword s_as("AS");
ParserLambdaExpression lambda_p;
@ -20,12 +24,21 @@ bool ParserCreateFunctionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Exp
ASTPtr function_name;
ASTPtr function_core;
bool or_replace = false;
bool if_not_exists = false;
if (!s_create.ignore(pos, expected))
return false;
if (s_or_replace.ignore(pos, expected))
or_replace = true;
if (!s_function.ignore(pos, expected))
return false;
if (!or_replace && s_if_not_exists.ignore(pos, expected))
if_not_exists = true;
if (!function_name_p.parse(pos, function_name, expected))
return false;
@ -40,6 +53,8 @@ bool ParserCreateFunctionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Exp
create_function_query->function_name = function_name->as<ASTIdentifier &>().name();
create_function_query->function_core = function_core;
create_function_query->or_replace = or_replace;
create_function_query->if_not_exists = if_not_exists;
return true;
}

View File

@ -11,7 +11,10 @@ bool ParserDropFunctionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expec
{
ParserKeyword s_drop("DROP");
ParserKeyword s_function("FUNCTION");
ParserKeyword s_if_exists("IF EXISTS");
ParserIdentifier function_name_p;
bool if_exists = false;
ASTPtr function_name;
@ -21,10 +24,14 @@ bool ParserDropFunctionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expec
if (!s_function.ignore(pos, expected))
return false;
if (s_if_exists.ignore(pos, expected))
if_exists = true;
if (!function_name_p.parse(pos, function_name, expected))
return false;
auto drop_function_query = std::make_shared<ASTDropFunctionQuery>();
drop_function_query->if_exists = if_exists;
node = drop_function_query;
drop_function_query->function_name = function_name->as<ASTIdentifier &>().name();

View File

@ -38,7 +38,8 @@ const std::unordered_set<std::string_view> keywords
"IN", "KILL", "QUERY", "SYNC", "ASYNC", "TEST", "BETWEEN", "TRUNCATE", "USER", "ROLE",
"PROFILE", "QUOTA", "POLICY", "ROW", "GRANT", "REVOKE", "OPTION", "ADMIN", "EXCEPT", "REPLACE",
"IDENTIFIED", "HOST", "NAME", "READONLY", "WRITABLE", "PERMISSIVE", "FOR", "RESTRICTIVE", "RANDOMIZED",
"INTERVAL", "LIMITS", "ONLY", "TRACKING", "IP", "REGEXP", "ILIKE", "DICTIONARY"
"INTERVAL", "LIMITS", "ONLY", "TRACKING", "IP", "REGEXP", "ILIKE", "DICTIONARY", "OFFSET",
"TRIM", "LTRIM", "RTRIM", "BOTH", "LEADING", "TRAILING"
};
const std::unordered_set<std::string_view> keep_words
@ -906,7 +907,13 @@ void obfuscateQueries(
/// Write quotes and the obfuscated content inside.
result.write(*token.begin);
obfuscateIdentifier({token.begin + 1, token.size() - 2}, result, obfuscate_map, used_nouns, hash_func);
/// If it is long, just replace it with hash. Long identifiers in queries are usually auto-generated.
if (token.size() > 32)
writeIntText(sipHash64(token.begin + 1, token.size() - 2), result);
else
obfuscateIdentifier({token.begin + 1, token.size() - 2}, result, obfuscate_map, used_nouns, hash_func);
result.write(token.end[-1]);
}
else if (token.type == TokenType::Number)

View File

@ -72,7 +72,8 @@ public:
InputPort & getPort(PortKind kind) { return *std::next(inputs.begin(), kind); }
/// Compatible to IBlockOutputStream interface
/// Compatibility with old interface.
/// TODO: separate formats and processors.
void write(const Block & block);

View File

@ -1,7 +1,6 @@
#include "CapnProtoRowInputFormat.h"
#if USE_CAPNP
#include <Core/Field.h>
#include <IO/ReadBuffer.h>
#include <Interpreters/Context.h>
#include <Formats/FormatFactory.h>
@ -9,198 +8,40 @@
#include <capnp/serialize.h>
#include <capnp/dynamic.h>
#include <capnp/common.h>
#include <base/logger_useful.h>
#include <base/find_symbols.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnDecimal.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeLowCardinality.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_TYPE_OF_FIELD;
extern const int THERE_IS_NO_COLUMN;
extern const int LOGICAL_ERROR;
}
static CapnProtoRowInputFormat::NestedField split(const Block & header, size_t i)
{
CapnProtoRowInputFormat::NestedField field = {{}, i};
// Remove leading dot in field definition, e.g. ".msg" -> "msg"
String name(header.safeGetByPosition(i).name);
if (!name.empty() && name[0] == '.')
name.erase(0, 1);
splitInto<'.', '_'>(field.tokens, name);
return field;
}
static Field convertNodeToField(const capnp::DynamicValue::Reader & value)
{
switch (value.getType())
{
case capnp::DynamicValue::UNKNOWN:
throw Exception("Unknown field type", ErrorCodes::BAD_TYPE_OF_FIELD);
case capnp::DynamicValue::VOID:
return Field();
case capnp::DynamicValue::BOOL:
return value.as<bool>() ? 1u : 0u;
case capnp::DynamicValue::INT:
return value.as<int64_t>();
case capnp::DynamicValue::UINT:
return value.as<uint64_t>();
case capnp::DynamicValue::FLOAT:
return value.as<double>();
case capnp::DynamicValue::TEXT:
{
auto arr = value.as<capnp::Text>();
return String(arr.begin(), arr.size());
}
case capnp::DynamicValue::DATA:
{
auto arr = value.as<capnp::Data>().asChars();
return String(arr.begin(), arr.size());
}
case capnp::DynamicValue::LIST:
{
auto list_value = value.as<capnp::DynamicList>();
Array res(list_value.size());
for (auto i : kj::indices(list_value))
res[i] = convertNodeToField(list_value[i]);
return res;
}
case capnp::DynamicValue::ENUM:
return value.as<capnp::DynamicEnum>().getRaw();
case capnp::DynamicValue::STRUCT:
{
auto struct_value = value.as<capnp::DynamicStruct>();
const auto & fields = struct_value.getSchema().getFields();
Tuple tuple(fields.size());
for (auto i : kj::indices(fields))
tuple[i] = convertNodeToField(struct_value.get(fields[i]));
return tuple;
}
case capnp::DynamicValue::CAPABILITY:
throw Exception("CAPABILITY type not supported", ErrorCodes::BAD_TYPE_OF_FIELD);
case capnp::DynamicValue::ANY_POINTER:
throw Exception("ANY_POINTER type not supported", ErrorCodes::BAD_TYPE_OF_FIELD);
}
return Field();
}
static capnp::StructSchema::Field getFieldOrThrow(capnp::StructSchema node, const std::string & field)
{
KJ_IF_MAYBE(child, node.findFieldByName(field))
return *child;
else
throw Exception("Field " + field + " doesn't exist in schema " + node.getShortDisplayName().cStr(), ErrorCodes::THERE_IS_NO_COLUMN);
}
void CapnProtoRowInputFormat::createActions(const NestedFieldList & sorted_fields, capnp::StructSchema reader)
{
/// Columns in a table can map to fields in Cap'n'Proto or to structs.
/// Store common parents and their tokens in order to backtrack.
std::vector<capnp::StructSchema::Field> parents;
std::vector<std::string> parent_tokens;
capnp::StructSchema cur_reader = reader;
for (const auto & field : sorted_fields)
{
if (field.tokens.empty())
throw Exception("Logical error in CapnProtoRowInputFormat", ErrorCodes::LOGICAL_ERROR);
// Backtrack to common parent
while (field.tokens.size() < parent_tokens.size() + 1
|| !std::equal(parent_tokens.begin(), parent_tokens.end(), field.tokens.begin()))
{
actions.push_back({Action::POP});
parents.pop_back();
parent_tokens.pop_back();
if (parents.empty())
{
cur_reader = reader;
break;
}
else
cur_reader = parents.back().getType().asStruct();
}
// Go forward
while (parent_tokens.size() + 1 < field.tokens.size())
{
const auto & token = field.tokens[parents.size()];
auto node = getFieldOrThrow(cur_reader, token);
if (node.getType().isStruct())
{
// Descend to field structure
parents.emplace_back(node);
parent_tokens.emplace_back(token);
cur_reader = node.getType().asStruct();
actions.push_back({Action::PUSH, node});
}
else if (node.getType().isList())
{
break; // Collect list
}
else
throw Exception("Field " + token + " is neither Struct nor List", ErrorCodes::BAD_TYPE_OF_FIELD);
}
// Read field from the structure
auto node = getFieldOrThrow(cur_reader, field.tokens[parents.size()]);
if (node.getType().isList() && !actions.empty() && actions.back().field == node)
{
// The field list here flattens Nested elements into multiple arrays
// In order to map Nested types in Cap'nProto back, they need to be collected
// Since the field names are sorted, the order of field positions must be preserved
// For example, if the fields are { b @0 :Text, a @1 :Text }, the `a` would come first
// even though it's position is second.
auto & columns = actions.back().columns;
auto it = std::upper_bound(columns.cbegin(), columns.cend(), field.pos);
columns.insert(it, field.pos);
}
else
{
actions.push_back({Action::READ, node, {field.pos}});
}
}
}
CapnProtoRowInputFormat::CapnProtoRowInputFormat(ReadBuffer & in_, Block header, Params params_, const FormatSchemaInfo & info)
: IRowInputFormat(std::move(header), in_, std::move(params_)), parser(std::make_shared<SchemaParser>())
CapnProtoRowInputFormat::CapnProtoRowInputFormat(ReadBuffer & in_, Block header, Params params_, const FormatSchemaInfo & info, const FormatSettings & format_settings_)
: IRowInputFormat(std::move(header), in_, std::move(params_))
, parser(std::make_shared<CapnProtoSchemaParser>())
, format_settings(format_settings_)
, column_types(getPort().getHeader().getDataTypes())
, column_names(getPort().getHeader().getNames())
{
// Parse the schema and fetch the root object
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
auto schema = parser->impl.parseDiskFile(info.schemaPath(), info.absoluteSchemaPath(), {});
#pragma GCC diagnostic pop
root = schema.getNested(info.messageName()).asStruct();
/**
* The schema typically consists of fields in various nested structures.
* Here we gather the list of fields and sort them in a way so that fields in the same structure are adjacent,
* and the nesting level doesn't decrease to make traversal easier.
*/
const auto & sample = getPort().getHeader();
NestedFieldList list;
size_t num_columns = sample.columns();
for (size_t i = 0; i < num_columns; ++i)
list.push_back(split(sample, i));
// Order list first by value of strings then by length of string vector.
std::sort(list.begin(), list.end(), [](const NestedField & a, const NestedField & b) { return a.tokens < b.tokens; });
createActions(list, root);
root = parser->getMessageSchema(info);
checkCapnProtoSchemaStructure(root, getPort().getHeader(), format_settings.capn_proto.enum_comparing_mode);
}
kj::Array<capnp::word> CapnProtoRowInputFormat::readMessage()
@ -233,6 +74,191 @@ kj::Array<capnp::word> CapnProtoRowInputFormat::readMessage()
return msg;
}
static void insertSignedInteger(IColumn & column, const DataTypePtr & column_type, Int64 value)
{
switch (column_type->getTypeId())
{
case TypeIndex::Int8:
assert_cast<ColumnInt8 &>(column).insertValue(value);
break;
case TypeIndex::Int16:
assert_cast<ColumnInt16 &>(column).insertValue(value);
break;
case TypeIndex::Int32:
assert_cast<ColumnInt32 &>(column).insertValue(value);
break;
case TypeIndex::Int64:
assert_cast<ColumnInt64 &>(column).insertValue(value);
break;
case TypeIndex::DateTime64:
assert_cast<ColumnDecimal<DateTime64> &>(column).insertValue(value);
break;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column type is not a signed integer.");
}
}
static void insertUnsignedInteger(IColumn & column, const DataTypePtr & column_type, UInt64 value)
{
switch (column_type->getTypeId())
{
case TypeIndex::UInt8:
assert_cast<ColumnUInt8 &>(column).insertValue(value);
break;
case TypeIndex::Date: [[fallthrough]];
case TypeIndex::UInt16:
assert_cast<ColumnUInt16 &>(column).insertValue(value);
break;
case TypeIndex::DateTime: [[fallthrough]];
case TypeIndex::UInt32:
assert_cast<ColumnUInt32 &>(column).insertValue(value);
break;
case TypeIndex::UInt64:
assert_cast<ColumnUInt64 &>(column).insertValue(value);
break;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column type is not an unsigned integer.");
}
}
static void insertFloat(IColumn & column, const DataTypePtr & column_type, Float64 value)
{
switch (column_type->getTypeId())
{
case TypeIndex::Float32:
assert_cast<ColumnFloat32 &>(column).insertValue(value);
break;
case TypeIndex::Float64:
assert_cast<ColumnFloat64 &>(column).insertValue(value);
break;
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column type is not a float.");
}
}
template <typename Value>
static void insertString(IColumn & column, Value value)
{
column.insertData(reinterpret_cast<const char *>(value.begin()), value.size());
}
template <typename ValueType>
static void insertEnum(IColumn & column, const DataTypePtr & column_type, const capnp::DynamicEnum & enum_value, FormatSettings::EnumComparingMode enum_comparing_mode)
{
auto enumerant = *kj::_::readMaybe(enum_value.getEnumerant());
auto enum_type = assert_cast<const DataTypeEnum<ValueType> *>(column_type.get());
DataTypePtr nested_type = std::make_shared<DataTypeNumber<ValueType>>();
switch (enum_comparing_mode)
{
case FormatSettings::EnumComparingMode::BY_VALUES:
insertSignedInteger(column, nested_type, Int64(enumerant.getOrdinal()));
return;
case FormatSettings::EnumComparingMode::BY_NAMES:
insertSignedInteger(column, nested_type, Int64(enum_type->getValue(String(enumerant.getProto().getName()))));
return;
case FormatSettings::EnumComparingMode::BY_NAMES_CASE_INSENSITIVE:
{
/// Find the same enum name case insensitive.
String enum_name = enumerant.getProto().getName();
for (auto & name : enum_type->getAllRegisteredNames())
{
if (compareEnumNames(name, enum_name, enum_comparing_mode))
{
insertSignedInteger(column, nested_type, Int64(enum_type->getValue(name)));
break;
}
}
}
}
}
static void insertValue(IColumn & column, const DataTypePtr & column_type, const capnp::DynamicValue::Reader & value, FormatSettings::EnumComparingMode enum_comparing_mode)
{
if (column_type->lowCardinality())
{
auto & lc_column = assert_cast<ColumnLowCardinality &>(column);
auto tmp_column = lc_column.getDictionary().getNestedColumn()->cloneEmpty();
auto dict_type = assert_cast<const DataTypeLowCardinality *>(column_type.get())->getDictionaryType();
insertValue(*tmp_column, dict_type, value, enum_comparing_mode);
lc_column.insertFromFullColumn(*tmp_column, 0);
return;
}
switch (value.getType())
{
case capnp::DynamicValue::Type::INT:
insertSignedInteger(column, column_type, value.as<Int64>());
break;
case capnp::DynamicValue::Type::UINT:
insertUnsignedInteger(column, column_type, value.as<UInt64>());
break;
case capnp::DynamicValue::Type::FLOAT:
insertFloat(column, column_type, value.as<Float64>());
break;
case capnp::DynamicValue::Type::BOOL:
insertUnsignedInteger(column, column_type, UInt64(value.as<bool>()));
break;
case capnp::DynamicValue::Type::DATA:
insertString(column, value.as<capnp::Data>());
break;
case capnp::DynamicValue::Type::TEXT:
insertString(column, value.as<capnp::Text>());
break;
case capnp::DynamicValue::Type::ENUM:
if (column_type->getTypeId() == TypeIndex::Enum8)
insertEnum<Int8>(column, column_type, value.as<capnp::DynamicEnum>(), enum_comparing_mode);
else
insertEnum<Int16>(column, column_type, value.as<capnp::DynamicEnum>(), enum_comparing_mode);
break;
case capnp::DynamicValue::LIST:
{
auto list_value = value.as<capnp::DynamicList>();
auto & column_array = assert_cast<ColumnArray &>(column);
auto & offsets = column_array.getOffsets();
offsets.push_back(offsets.back() + list_value.size());
auto & nested_column = column_array.getData();
auto nested_type = assert_cast<const DataTypeArray *>(column_type.get())->getNestedType();
for (const auto & nested_value : list_value)
insertValue(nested_column, nested_type, nested_value, enum_comparing_mode);
break;
}
case capnp::DynamicValue::Type::STRUCT:
{
auto struct_value = value.as<capnp::DynamicStruct>();
if (column_type->isNullable())
{
auto & nullable_column = assert_cast<ColumnNullable &>(column);
auto field = *kj::_::readMaybe(struct_value.which());
if (field.getType().isVoid())
nullable_column.insertDefault();
else
{
auto & nested_column = nullable_column.getNestedColumn();
auto nested_type = assert_cast<const DataTypeNullable *>(column_type.get())->getNestedType();
auto nested_value = struct_value.get(field);
insertValue(nested_column, nested_type, nested_value, enum_comparing_mode);
nullable_column.getNullMapData().push_back(0);
}
}
else
{
auto & tuple_column = assert_cast<ColumnTuple &>(column);
const auto * tuple_type = assert_cast<const DataTypeTuple *>(column_type.get());
for (size_t i = 0; i != tuple_column.tupleSize(); ++i)
insertValue(
tuple_column.getColumn(i),
tuple_type->getElements()[i],
struct_value.get(tuple_type->getElementNames()[i]),
enum_comparing_mode);
}
break;
}
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CapnProto value type.");
}
}
bool CapnProtoRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
if (in->eof())
@ -245,51 +271,12 @@ bool CapnProtoRowInputFormat::readRow(MutableColumns & columns, RowReadExtension
#else
capnp::FlatArrayMessageReader msg(array);
#endif
std::vector<capnp::DynamicStruct::Reader> stack;
stack.push_back(msg.getRoot<capnp::DynamicStruct>(root));
for (auto action : actions)
auto root_reader = msg.getRoot<capnp::DynamicStruct>(root);
for (size_t i = 0; i != columns.size(); ++i)
{
switch (action.type)
{
case Action::READ:
{
Field value = convertNodeToField(stack.back().get(action.field));
if (action.columns.size() > 1)
{
// Nested columns must be flattened into several arrays
// e.g. Array(Tuple(x ..., y ...)) -> Array(x ...), Array(y ...)
const auto & collected = DB::get<const Array &>(value);
size_t size = collected.size();
// The flattened array contains an array of a part of the nested tuple
Array flattened(size);
for (size_t column_index = 0; column_index < action.columns.size(); ++column_index)
{
// Populate array with a single tuple elements
for (size_t off = 0; off < size; ++off)
{
const auto & tuple = DB::get<const Tuple &>(collected[off]);
flattened[off] = tuple[column_index];
}
auto & col = columns[action.columns[column_index]];
col->insert(flattened);
}
}
else
{
auto & col = columns[action.columns[0]];
col->insert(value);
}
break;
}
case Action::POP:
stack.pop_back();
break;
case Action::PUSH:
stack.push_back(stack.back().get(action.field).as<capnp::DynamicStruct>());
break;
}
auto value = getReaderByColumnName(root_reader, column_names[i]);
insertValue(*columns[i], column_types[i], value, format_settings.capn_proto.enum_comparing_mode);
}
return true;
@ -302,8 +289,7 @@ void registerInputFormatCapnProto(FormatFactory & factory)
[](ReadBuffer & buf, const Block & sample, IRowInputFormat::Params params, const FormatSettings & settings)
{
return std::make_shared<CapnProtoRowInputFormat>(buf, sample, std::move(params),
FormatSchemaInfo(settings.schema.format_schema, "CapnProto", true,
settings.schema.is_server, settings.schema.format_schema_path));
FormatSchemaInfo(settings, "CapnProto", true), settings);
});
}

View File

@ -4,8 +4,8 @@
#if USE_CAPNP
#include <Core/Block.h>
#include <Formats/CapnProtoUtils.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <capnp/schema-parser.h>
namespace DB
{
@ -22,18 +22,7 @@ class ReadBuffer;
class CapnProtoRowInputFormat : public IRowInputFormat
{
public:
struct NestedField
{
std::vector<std::string> tokens;
size_t pos;
};
using NestedFieldList = std::vector<NestedField>;
/** schema_dir - base path for schema files
* schema_file - location of the capnproto schema, e.g. "schema.capnp"
* root_object - name to the root object, e.g. "Message"
*/
CapnProtoRowInputFormat(ReadBuffer & in_, Block header, Params params_, const FormatSchemaInfo & info);
CapnProtoRowInputFormat(ReadBuffer & in_, Block header, Params params_, const FormatSchemaInfo & info, const FormatSettings & format_settings_);
String getName() const override { return "CapnProtoRowInputFormat"; }
@ -42,34 +31,11 @@ public:
private:
kj::Array<capnp::word> readMessage();
// Build a traversal plan from a sorted list of fields
void createActions(const NestedFieldList & sorted_fields, capnp::StructSchema reader);
/* Action for state machine for traversing nested structures. */
using BlockPositionList = std::vector<size_t>;
struct Action
{
enum Type { POP, PUSH, READ };
Type type{};
capnp::StructSchema::Field field{};
BlockPositionList columns{};
};
// Wrapper for classes that could throw in destructor
// https://github.com/capnproto/capnproto/issues/553
template <typename T>
struct DestructorCatcher
{
T impl;
template <typename ... Arg>
DestructorCatcher(Arg && ... args) : impl(kj::fwd<Arg>(args)...) {}
~DestructorCatcher() noexcept try { } catch (...) { return; }
};
using SchemaParser = DestructorCatcher<capnp::SchemaParser>;
std::shared_ptr<SchemaParser> parser;
std::shared_ptr<CapnProtoSchemaParser> parser;
capnp::StructSchema root;
std::vector<Action> actions;
const FormatSettings format_settings;
DataTypes column_types;
Names column_names;
};
}

View File

@ -0,0 +1,268 @@
#include <Processors/Formats/Impl/CapnProtoRowOutputFormat.h>
#if USE_CAPNP
#include <Formats/CapnProtoUtils.h>
#include <Formats/FormatSettings.h>
#include <IO/WriteBuffer.h>
#include <capnp/dynamic.h>
#include <capnp/serialize-packed.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnDecimal.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeLowCardinality.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
CapnProtoOutputStream::CapnProtoOutputStream(WriteBuffer & out_) : out(out_)
{
}
void CapnProtoOutputStream::write(const void * buffer, size_t size)
{
out.write(reinterpret_cast<const char *>(buffer), size);
}
CapnProtoRowOutputFormat::CapnProtoRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & info,
const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), column_names(header_.getNames()), column_types(header_.getDataTypes()), output_stream(std::make_unique<CapnProtoOutputStream>(out_)), format_settings(format_settings_)
{
schema = schema_parser.getMessageSchema(info);
checkCapnProtoSchemaStructure(schema, getPort(PortKind::Main).getHeader(), format_settings.capn_proto.enum_comparing_mode);
}
template <typename EnumValue>
static capnp::DynamicEnum getDynamicEnum(
const ColumnPtr & column,
const DataTypePtr & data_type,
size_t row_num,
const capnp::EnumSchema & enum_schema,
FormatSettings::EnumComparingMode mode)
{
const auto * enum_data_type = assert_cast<const DataTypeEnum<EnumValue> *>(data_type.get());
EnumValue enum_value = column->getInt(row_num);
if (mode == FormatSettings::EnumComparingMode::BY_VALUES)
return capnp::DynamicEnum(enum_schema, enum_value);
auto enum_name = enum_data_type->getNameForValue(enum_value);
for (const auto enumerant : enum_schema.getEnumerants())
{
if (compareEnumNames(String(enum_name), enumerant.getProto().getName(), mode))
return capnp::DynamicEnum(enumerant);
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot convert CLickHouse Enum value to CapnProto Enum");
}
static capnp::DynamicValue::Builder initStructFieldBuilder(const ColumnPtr & column, size_t row_num, capnp::DynamicStruct::Builder & struct_builder, capnp::StructSchema::Field field)
{
if (const auto * array_column = checkAndGetColumn<ColumnArray>(*column))
{
size_t size = array_column->getOffsets()[row_num] - array_column->getOffsets()[row_num - 1];
return struct_builder.init(field, size);
}
if (field.getType().isStruct())
return struct_builder.init(field);
return struct_builder.get(field);
}
static std::optional<capnp::DynamicValue::Reader> convertToDynamicValue(
const ColumnPtr & column,
const DataTypePtr & data_type,
size_t row_num,
capnp::DynamicValue::Builder builder,
FormatSettings::EnumComparingMode enum_comparing_mode,
std::vector<std::unique_ptr<String>> & temporary_text_data_storage)
{
/// Here we don't do any types validation, because we did it in CapnProtoRowOutputFormat constructor.
if (data_type->lowCardinality())
{
const auto * lc_column = assert_cast<const ColumnLowCardinality *>(column.get());
const auto & dict_type = assert_cast<const DataTypeLowCardinality *>(data_type.get())->getDictionaryType();
size_t index = lc_column->getIndexAt(row_num);
return convertToDynamicValue(lc_column->getDictionary().getNestedColumn(), dict_type, index, builder, enum_comparing_mode, temporary_text_data_storage);
}
switch (builder.getType())
{
case capnp::DynamicValue::Type::INT:
/// We allow output DateTime64 as Int64.
if (WhichDataType(data_type).isDateTime64())
return capnp::DynamicValue::Reader(assert_cast<const ColumnDecimal<DateTime64> *>(column.get())->getElement(row_num));
return capnp::DynamicValue::Reader(column->getInt(row_num));
case capnp::DynamicValue::Type::UINT:
return capnp::DynamicValue::Reader(column->getUInt(row_num));
case capnp::DynamicValue::Type::BOOL:
return capnp::DynamicValue::Reader(column->getBool(row_num));
case capnp::DynamicValue::Type::FLOAT:
return capnp::DynamicValue::Reader(column->getFloat64(row_num));
case capnp::DynamicValue::Type::ENUM:
{
auto enum_schema = builder.as<capnp::DynamicEnum>().getSchema();
if (data_type->getTypeId() == TypeIndex::Enum8)
return capnp::DynamicValue::Reader(
getDynamicEnum<Int8>(column, data_type, row_num, enum_schema, enum_comparing_mode));
return capnp::DynamicValue::Reader(
getDynamicEnum<Int16>(column, data_type, row_num, enum_schema, enum_comparing_mode));
}
case capnp::DynamicValue::Type::DATA:
{
auto data = column->getDataAt(row_num);
return capnp::DynamicValue::Reader(capnp::Data::Reader(reinterpret_cast<const kj::byte *>(data.data), data.size));
}
case capnp::DynamicValue::Type::TEXT:
{
/// In TEXT type data should be null-terminated, but ClickHouse String data could not be.
/// To make data null-terminated we should copy it to temporary String object, but
/// capnp::Text::Reader works only with pointer to the data and it's size, so we should
/// guarantee that new String object life time is longer than capnp::Text::Reader life time.
/// To do this we store new String object in a temporary storage, passed in this function
/// by reference. We use unique_ptr<String> instead of just String to avoid pointers
/// invalidation on vector reallocation.
temporary_text_data_storage.push_back(std::make_unique<String>(column->getDataAt(row_num)));
auto & data = temporary_text_data_storage.back();
return capnp::DynamicValue::Reader(capnp::Text::Reader(data->data(), data->size()));
}
case capnp::DynamicValue::Type::STRUCT:
{
auto struct_builder = builder.as<capnp::DynamicStruct>();
auto nested_struct_schema = struct_builder.getSchema();
/// Struct can be represent Tuple or Naullable (named union with two fields)
if (data_type->isNullable())
{
const auto * nullable_type = assert_cast<const DataTypeNullable *>(data_type.get());
const auto * nullable_column = assert_cast<const ColumnNullable *>(column.get());
auto fields = nested_struct_schema.getUnionFields();
if (nullable_column->isNullAt(row_num))
{
auto null_field = fields[0].getType().isVoid() ? fields[0] : fields[1];
struct_builder.set(null_field, capnp::Void());
}
else
{
auto value_field = fields[0].getType().isVoid() ? fields[1] : fields[0];
struct_builder.clear(value_field);
const auto & nested_column = nullable_column->getNestedColumnPtr();
auto value_builder = initStructFieldBuilder(nested_column, row_num, struct_builder, value_field);
auto value = convertToDynamicValue(nested_column, nullable_type->getNestedType(), row_num, value_builder, enum_comparing_mode, temporary_text_data_storage);
if (value)
struct_builder.set(value_field, std::move(*value));
}
}
else
{
const auto * tuple_data_type = assert_cast<const DataTypeTuple *>(data_type.get());
auto nested_types = tuple_data_type->getElements();
const auto & nested_columns = assert_cast<const ColumnTuple *>(column.get())->getColumns();
for (const auto & name : tuple_data_type->getElementNames())
{
auto pos = tuple_data_type->getPositionByName(name);
auto field_builder
= initStructFieldBuilder(nested_columns[pos], row_num, struct_builder, nested_struct_schema.getFieldByName(name));
auto value = convertToDynamicValue(nested_columns[pos], nested_types[pos], row_num, field_builder, enum_comparing_mode, temporary_text_data_storage);
if (value)
struct_builder.set(name, std::move(*value));
}
}
return std::nullopt;
}
case capnp::DynamicValue::Type::LIST:
{
auto list_builder = builder.as<capnp::DynamicList>();
const auto * array_column = assert_cast<const ColumnArray *>(column.get());
const auto & nested_column = array_column->getDataPtr();
const auto & nested_type = assert_cast<const DataTypeArray *>(data_type.get())->getNestedType();
const auto & offsets = array_column->getOffsets();
auto offset = offsets[row_num - 1];
size_t size = offsets[row_num] - offset;
const auto * nested_array_column = checkAndGetColumn<ColumnArray>(*nested_column);
for (size_t i = 0; i != size; ++i)
{
capnp::DynamicValue::Builder value_builder;
/// For nested arrays we need to initialize nested list builder.
if (nested_array_column)
{
const auto & nested_offset = nested_array_column->getOffsets();
size_t nested_array_size = nested_offset[offset + i] - nested_offset[offset + i - 1];
value_builder = list_builder.init(i, nested_array_size);
}
else
value_builder = list_builder[i];
auto value = convertToDynamicValue(nested_column, nested_type, offset + i, value_builder, enum_comparing_mode, temporary_text_data_storage);
if (value)
list_builder.set(i, std::move(*value));
}
return std::nullopt;
}
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CapnProto type.");
}
}
void CapnProtoRowOutputFormat::write(const Columns & columns, size_t row_num)
{
capnp::MallocMessageBuilder message;
/// Temporary storage for data that will be outputted in fields with CapnProto type TEXT.
/// See comment in convertToDynamicValue() for more details.
std::vector<std::unique_ptr<String>> temporary_text_data_storage;
capnp::DynamicStruct::Builder root = message.initRoot<capnp::DynamicStruct>(schema);
for (size_t i = 0; i != columns.size(); ++i)
{
auto [struct_builder, field] = getStructBuilderAndFieldByColumnName(root, column_names[i]);
auto field_builder = initStructFieldBuilder(columns[i], row_num, struct_builder, field);
auto value = convertToDynamicValue(columns[i], column_types[i], row_num, field_builder, format_settings.capn_proto.enum_comparing_mode, temporary_text_data_storage);
if (value)
struct_builder.set(field, *value);
}
capnp::writeMessage(*output_stream, message);
}
void registerOutputFormatCapnProto(FormatFactory & factory)
{
factory.registerOutputFormat("CapnProto", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<CapnProtoRowOutputFormat>(buf, sample, params, FormatSchemaInfo(format_settings, "CapnProto", true), format_settings);
});
}
}
#else
namespace DB
{
class FormatFactory;
void registerOutputFormatCapnProto(FormatFactory &) {}
}
#endif // USE_CAPNP

View File

@ -0,0 +1,53 @@
#pragma once
#include "config_formats.h"
#if USE_CAPNP
#include <Processors/Formats/IRowOutputFormat.h>
#include <Formats/FormatSchemaInfo.h>
#include <Formats/CapnProtoUtils.h>
#include <capnp/schema.h>
#include <capnp/dynamic.h>
#include <kj/io.h>
namespace DB
{
class CapnProtoOutputStream : public kj::OutputStream
{
public:
CapnProtoOutputStream(WriteBuffer & out_);
void write(const void * buffer, size_t size) override;
private:
WriteBuffer & out;
};
class CapnProtoRowOutputFormat : public IRowOutputFormat
{
public:
CapnProtoRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSchemaInfo & info,
const FormatSettings & format_settings_);
String getName() const override { return "CapnProtoRowOutputFormat"; }
void write(const Columns & columns, size_t row_num) override;
void writeField(const IColumn &, const ISerialization &, size_t) override { }
private:
Names column_names;
DataTypes column_types;
capnp::StructSchema schema;
std::unique_ptr<CapnProtoOutputStream> output_stream;
const FormatSettings format_settings;
CapnProtoSchemaParser schema_parser;
};
}
#endif // USE_CAPNP

View File

@ -67,8 +67,7 @@ void registerInputFormatProtobuf(FormatFactory & factory)
const FormatSettings & settings)
{
return std::make_shared<ProtobufRowInputFormat>(buf, sample, std::move(params),
FormatSchemaInfo(settings.schema.format_schema, "Protobuf", true,
settings.schema.is_server, settings.schema.format_schema_path),
FormatSchemaInfo(settings, "Protobuf", true),
with_length_delimiter);
});
}

View File

@ -64,9 +64,7 @@ void registerOutputFormatProtobuf(FormatFactory & factory)
{
return std::make_shared<ProtobufRowOutputFormat>(
buf, header, params,
FormatSchemaInfo(settings.schema.format_schema, "Protobuf",
true, settings.schema.is_server,
settings.schema.format_schema_path),
FormatSchemaInfo(settings, "Protobuf", true),
settings,
with_length_delimiter);
});

View File

@ -9,6 +9,7 @@
#include <Processors/Sources/NullSource.h>
#include <Processors/Merges/AggregatingSortedTransform.h>
#include <Processors/Merges/CollapsingSortedTransform.h>
#include <Processors/Merges/GraphiteRollupSortedTransform.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Merges/ReplacingSortedTransform.h>
#include <Processors/Merges/SummingSortedTransform.h>
@ -506,38 +507,39 @@ static void addMergingFinal(
const auto & header = pipe.getHeader();
size_t num_outputs = pipe.numOutputPorts();
auto now = time(nullptr);
auto get_merging_processor = [&]() -> MergingTransformPtr
{
switch (merging_params.mode)
{
case MergeTreeData::MergingParams::Ordinary:
{
return std::make_shared<MergingSortedTransform>(header, num_outputs,
sort_description, max_block_size);
}
sort_description, max_block_size);
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedTransform>(header, num_outputs,
sort_description, merging_params.sign_column, true, max_block_size);
sort_description, merging_params.sign_column, true, max_block_size);
case MergeTreeData::MergingParams::Summing:
return std::make_shared<SummingSortedTransform>(header, num_outputs,
sort_description, merging_params.columns_to_sum, partition_key_columns, max_block_size);
sort_description, merging_params.columns_to_sum, partition_key_columns, max_block_size);
case MergeTreeData::MergingParams::Aggregating:
return std::make_shared<AggregatingSortedTransform>(header, num_outputs,
sort_description, max_block_size);
sort_description, max_block_size);
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedTransform>(header, num_outputs,
sort_description, merging_params.version_column, max_block_size);
sort_description, merging_params.version_column, max_block_size);
case MergeTreeData::MergingParams::VersionedCollapsing:
return std::make_shared<VersionedCollapsingTransform>(header, num_outputs,
sort_description, merging_params.sign_column, max_block_size);
sort_description, merging_params.sign_column, max_block_size);
case MergeTreeData::MergingParams::Graphite:
throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
return std::make_shared<GraphiteRollupSortedTransform>(header, num_outputs,
sort_description, max_block_size, merging_params.graphite_params, now);
}
__builtin_unreachable();

View File

@ -69,8 +69,7 @@ void SourceWithProgress::work()
}
}
/// Aggregated copy-paste from IBlockInputStream::progressImpl.
/// Most of this must be done in PipelineExecutor outside. Now it's done for compatibility with IBlockInputStream.
/// TODO: Most of this must be done in PipelineExecutor outside.
void SourceWithProgress::progress(const Progress & value)
{
was_progress_called = true;
@ -135,14 +134,12 @@ void SourceWithProgress::progress(const Progress & value)
if (last_profile_events_update_time + profile_events_update_period_microseconds < total_elapsed_microseconds)
{
/// Should be done in PipelineExecutor.
/// It is here for compatibility with IBlockInputsStream.
/// TODO: Should be done in PipelineExecutor.
CurrentThread::updatePerformanceCounters();
last_profile_events_update_time = total_elapsed_microseconds;
}
/// Should be done in PipelineExecutor.
/// It is here for compatibility with IBlockInputsStream.
/// TODO: Should be done in PipelineExecutor.
limits.speed_limits.throttle(progress.read_rows, progress.read_bytes, total_rows, total_elapsed_microseconds);
if (quota && limits.mode == LimitsMode::LIMITS_TOTAL)

View File

@ -12,7 +12,7 @@ class Block;
class ReadBuffer;
class WriteBuffer;
/// Information for profiling. See IBlockInputStream.h
/// Information for profiling. See SourceWithProgress.h
struct ProfileInfo
{
bool started = false;

View File

@ -129,7 +129,6 @@ public:
void setLeafLimits(const SizeLimits & limits) { pipe.setLeafLimits(limits); }
void setQuota(const std::shared_ptr<const EnabledQuota> & quota) { pipe.setQuota(quota); }
/// For compatibility with IBlockInputStream.
void setProgressCallback(const ProgressCallback & callback);
void setProcessListElement(QueryStatus * elem);

View File

@ -30,6 +30,7 @@
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/Session.h>
#include <Interpreters/ProfileEventsExt.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <Storages/StorageS3Cluster.h>
@ -831,12 +832,6 @@ namespace
{
using namespace ProfileEvents;
enum ProfileEventTypes : int8_t
{
INCREMENT = 1,
GAUGE = 2,
};
constexpr size_t NAME_COLUMN_INDEX = 4;
constexpr size_t VALUE_COLUMN_INDEX = 5;
@ -879,7 +874,7 @@ namespace
columns[i++]->insertData(host_name.data(), host_name.size());
columns[i++]->insert(UInt64(snapshot.current_time));
columns[i++]->insert(UInt64{snapshot.thread_id});
columns[i++]->insert(ProfileEventTypes::INCREMENT);
columns[i++]->insert(ProfileEvents::Type::INCREMENT);
}
}
@ -893,7 +888,7 @@ namespace
columns[i++]->insertData(host_name.data(), host_name.size());
columns[i++]->insert(UInt64(snapshot.current_time));
columns[i++]->insert(UInt64{snapshot.thread_id});
columns[i++]->insert(ProfileEventTypes::GAUGE);
columns[i++]->insert(ProfileEvents::Type::GAUGE);
columns[i++]->insertData(MemoryTracker::USAGE_EVENT_NAME, strlen(MemoryTracker::USAGE_EVENT_NAME));
columns[i++]->insert(snapshot.memory_usage);
@ -907,18 +902,11 @@ void TCPHandler::sendProfileEvents()
if (client_tcp_protocol_version < DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS)
return;
auto profile_event_type = std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values
{
{ "increment", static_cast<Int8>(INCREMENT)},
{ "gauge", static_cast<Int8>(GAUGE)},
});
NamesAndTypesList column_names_and_types = {
{ "host_name", std::make_shared<DataTypeString>() },
{ "current_time", std::make_shared<DataTypeDateTime>() },
{ "thread_id", std::make_shared<DataTypeUInt64>() },
{ "type", profile_event_type },
{ "type", ProfileEvents::TypeEnum },
{ "name", std::make_shared<DataTypeString>() },
{ "value", std::make_shared<DataTypeUInt64>() },
};

View File

@ -205,6 +205,8 @@ MergeTreeData::MergeTreeData(
, background_operations_assignee(*this, BackgroundJobsAssignee::Type::DataProcessing, getContext())
, background_moves_assignee(*this, BackgroundJobsAssignee::Type::Moving, getContext())
{
context_->getGlobalContext()->initializeBackgroundExecutorsIfNeeded();
const auto settings = getSettings();
allow_nullable_key = attach || settings->allow_nullable_key;
@ -4470,16 +4472,6 @@ Block MergeTreeData::getMinMaxCountProjectionBlock(
}
size_t pos = 0;
if (!primary_key_max_column_name.empty())
{
const auto & primary_key_column = *part->index[0];
auto primary_key_column_size = primary_key_column.size();
auto & min_column = assert_cast<ColumnAggregateFunction &>(*minmax_count_columns[pos++]);
auto & max_column = assert_cast<ColumnAggregateFunction &>(*minmax_count_columns[pos++]);
insert(min_column, primary_key_column[0]);
insert(max_column, primary_key_column[primary_key_column_size - 1]);
}
size_t minmax_idx_size = part->minmax_idx->hyperrectangle.size();
for (size_t i = 0; i < minmax_idx_size; ++i)
{
@ -4490,6 +4482,16 @@ Block MergeTreeData::getMinMaxCountProjectionBlock(
insert(max_column, range.right);
}
if (!primary_key_max_column_name.empty())
{
const auto & primary_key_column = *part->index[0];
auto primary_key_column_size = primary_key_column.size();
auto & min_column = assert_cast<ColumnAggregateFunction &>(*minmax_count_columns[pos++]);
auto & max_column = assert_cast<ColumnAggregateFunction &>(*minmax_count_columns[pos++]);
insert(min_column, primary_key_column[0]);
insert(max_column, primary_key_column[primary_key_column_size - 1]);
}
{
auto & column = assert_cast<ColumnAggregateFunction &>(*minmax_count_columns.back());
auto func = column.getAggregateFunction();

View File

@ -406,6 +406,7 @@ public:
|| merging_params.mode == MergingParams::Summing
|| merging_params.mode == MergingParams::Aggregating
|| merging_params.mode == MergingParams::Replacing
|| merging_params.mode == MergingParams::Graphite
|| merging_params.mode == MergingParams::VersionedCollapsing;
}

View File

@ -160,9 +160,10 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
readData(column_from_part, column, from_mark, *column_positions[pos], rows_to_read, read_only_offsets[pos]);
size_t read_rows_in_column = column->size() - column_size_before_reading;
if (read_rows_in_column < rows_to_read)
throw Exception("Cannot read all data in MergeTreeReaderCompact. Rows read: " + toString(read_rows_in_column) +
". Rows expected: " + toString(rows_to_read) + ".", ErrorCodes::CANNOT_READ_ALL_DATA);
if (read_rows_in_column != rows_to_read)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA,
"Cannot read all data in MergeTreeReaderCompact. Rows read: {}. Rows expected: {}.",
read_rows_in_column, rows_to_read);
}
catch (Exception & e)
{

View File

@ -10,6 +10,7 @@ namespace DB
namespace ErrorCodes
{
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int CANNOT_READ_ALL_DATA;
}
@ -76,6 +77,10 @@ MergeTreeReaderStream::MergeTreeReaderStream(
if (max_mark_range_bytes != 0)
read_settings = read_settings.adjustBufferSize(max_mark_range_bytes);
//// Empty buffer does not makes progress.
if (!read_settings.local_fs_buffer_size || !read_settings.remote_fs_buffer_size)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read to empty buffer.");
/// Initialize the objects that shall be used to perform read operations.
if (uncompressed_cache)
{

View File

@ -69,10 +69,6 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
size_t num_columns = columns.size();
checkNumberOfColumns(num_columns);
/// Pointers to offset columns that are common to the nested data structure columns.
/// If append is true, then the value will be equal to nullptr and will be used only to
/// check that the offsets column has been already read.
OffsetColumns offset_columns;
std::unordered_map<String, ISerialization::SubstreamsCache> caches;
std::unordered_set<std::string> prefetched_streams;

View File

@ -184,16 +184,16 @@ ProjectionDescription ProjectionDescription::getMinMaxCountProjection(
auto select_query = std::make_shared<ASTProjectionSelectQuery>();
ASTPtr select_expression_list = std::make_shared<ASTExpressionList>();
if (!primary_key_asts.empty())
{
select_expression_list->children.push_back(makeASTFunction("min", primary_key_asts.front()->clone()));
select_expression_list->children.push_back(makeASTFunction("max", primary_key_asts.front()->clone()));
}
for (const auto & column : minmax_columns)
{
select_expression_list->children.push_back(makeASTFunction("min", std::make_shared<ASTIdentifier>(column)));
select_expression_list->children.push_back(makeASTFunction("max", std::make_shared<ASTIdentifier>(column)));
}
if (!primary_key_asts.empty())
{
select_expression_list->children.push_back(makeASTFunction("min", primary_key_asts.front()->clone()));
select_expression_list->children.push_back(makeASTFunction("max", primary_key_asts.front()->clone()));
}
select_expression_list->children.push_back(makeASTFunction("count"));
select_query->setExpression(ASTProjectionSelectQuery::Expression::SELECT, std::move(select_expression_list));
@ -207,8 +207,14 @@ ProjectionDescription ProjectionDescription::getMinMaxCountProjection(
result.query_ast, query_context, storage, {}, SelectQueryOptions{QueryProcessingStage::WithMergeableState}.modify().ignoreAlias());
result.required_columns = select.getRequiredColumns();
result.sample_block = select.getSampleBlock();
if (!primary_key_asts.empty())
result.primary_key_max_column_name = result.sample_block.getNames()[ProjectionDescription::PRIMARY_KEY_MAX_COLUMN_POS];
/// If we have primary key and it's not in minmax_columns, it will be used as one additional minmax columns.
if (!primary_key_asts.empty() && result.sample_block.columns() == 2 * (minmax_columns.size() + 1) + 1)
{
/// min(p1), max(p1), min(p2), max(p2), ..., min(k1), max(k1), count()
/// ^
/// size - 2
result.primary_key_max_column_name = *(result.sample_block.getNames().cend() - 2);
}
result.type = ProjectionDescription::Type::Aggregate;
StorageInMemoryMetadata metadata;
metadata.setColumns(ColumnsDescription(result.sample_block.getNamesAndTypesList()));

Some files were not shown because too many files have changed in this diff Show More