Merge branch 'master' into rocksdb_metacache

This commit is contained in:
taiyang-li 2022-02-09 11:54:10 +08:00
commit d04ccc0489
84 changed files with 2761 additions and 268 deletions

3
.gitmodules vendored
View File

@ -259,3 +259,6 @@
[submodule "contrib/azure"]
path = contrib/azure
url = https://github.com/ClickHouse-Extras/azure-sdk-for-cpp.git
[submodule "contrib/minizip-ng"]
path = contrib/minizip-ng
url = https://github.com/zlib-ng/minizip-ng

View File

@ -78,6 +78,7 @@ add_contrib (croaring-cmake croaring)
add_contrib (zstd-cmake zstd)
add_contrib (zlib-ng-cmake zlib-ng)
add_contrib (bzip2-cmake bzip2)
add_contrib (minizip-ng-cmake minizip-ng)
add_contrib (snappy-cmake snappy)
add_contrib (rocksdb-cmake rocksdb)
add_contrib (thrift-cmake thrift)

View File

@ -56,19 +56,11 @@ list(APPEND SOURCES ${CASS_SRC_DIR}/atomic/atomic_std.hpp)
add_library(_curl_hostcheck OBJECT ${CASS_SRC_DIR}/third_party/curl/hostcheck.cpp)
add_library(_hdr_histogram OBJECT ${CASS_SRC_DIR}/third_party/hdr_histogram/hdr_histogram.cpp)
add_library(_http-parser OBJECT ${CASS_SRC_DIR}/third_party/http-parser/http_parser.c)
add_library(_minizip OBJECT
${CASS_SRC_DIR}/third_party/minizip/ioapi.c
${CASS_SRC_DIR}/third_party/minizip/zip.c
${CASS_SRC_DIR}/third_party/minizip/unzip.c)
target_link_libraries(_minizip ch_contrib::zlib)
target_compile_definitions(_minizip PRIVATE "-Dz_crc_t=unsigned long")
list(APPEND INCLUDE_DIRS
${CASS_SRC_DIR}/third_party/curl
${CASS_SRC_DIR}/third_party/hdr_histogram
${CASS_SRC_DIR}/third_party/http-parser
${CASS_SRC_DIR}/third_party/minizip
${CASS_SRC_DIR}/third_party/mt19937_64
${CASS_SRC_DIR}/third_party/rapidjson/rapidjson
${CASS_SRC_DIR}/third_party/sparsehash/src)
@ -123,10 +115,9 @@ add_library(_cassandra
${SOURCES}
$<TARGET_OBJECTS:_curl_hostcheck>
$<TARGET_OBJECTS:_hdr_histogram>
$<TARGET_OBJECTS:_http-parser>
$<TARGET_OBJECTS:_minizip>)
$<TARGET_OBJECTS:_http-parser>)
target_link_libraries(_cassandra ch_contrib::zlib)
target_link_libraries(_cassandra ch_contrib::zlib ch_contrib::minizip)
target_include_directories(_cassandra PRIVATE ${CMAKE_CURRENT_BINARY_DIR} ${INCLUDE_DIRS})
target_include_directories(_cassandra SYSTEM BEFORE PUBLIC ${CASS_INCLUDE_DIR})
target_compile_definitions(_cassandra PRIVATE CASS_BUILDING)

1
contrib/minizip-ng vendored Submodule

@ -0,0 +1 @@
Subproject commit 6cffc951851620e0fac1993be75e4713c334de03

View File

@ -0,0 +1,168 @@
option(ENABLE_MINIZIP "Enable minizip-ng the zip manipulation library" ${ENABLE_LIBRARIES})
if (NOT ENABLE_MINIZIP)
message (STATUS "minizip-ng disabled")
return()
endif()
set(_MINIZIP_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/minizip-ng")
# Initial source files
set(MINIZIP_SRC
${_MINIZIP_SOURCE_DIR}/mz_crypt.c
${_MINIZIP_SOURCE_DIR}/mz_os.c
${_MINIZIP_SOURCE_DIR}/mz_strm.c
${_MINIZIP_SOURCE_DIR}/mz_strm_buf.c
${_MINIZIP_SOURCE_DIR}/mz_strm_mem.c
${_MINIZIP_SOURCE_DIR}/mz_strm_split.c
${_MINIZIP_SOURCE_DIR}/mz_zip.c
${_MINIZIP_SOURCE_DIR}/mz_zip_rw.c)
# Initial header files
set(MINIZIP_HDR
${_MINIZIP_SOURCE_DIR}/mz.h
${_MINIZIP_SOURCE_DIR}/mz_os.h
${_MINIZIP_SOURCE_DIR}/mz_crypt.h
${_MINIZIP_SOURCE_DIR}/mz_strm.h
${_MINIZIP_SOURCE_DIR}/mz_strm_buf.h
${_MINIZIP_SOURCE_DIR}/mz_strm_mem.h
${_MINIZIP_SOURCE_DIR}/mz_strm_split.h
${_MINIZIP_SOURCE_DIR}/mz_strm_os.h
${_MINIZIP_SOURCE_DIR}/mz_zip.h
${_MINIZIP_SOURCE_DIR}/mz_zip_rw.h)
set(MINIZIP_INC ${_MINIZIP_SOURCE_DIR})
set(MINIZIP_DEF)
set(MINIZIP_PUBLIC_DEF)
set(MINIZIP_LIB)
# Check if zlib is present
set(MZ_ZLIB ON)
if(MZ_ZLIB)
# Use zlib from ClickHouse contrib
list(APPEND MINIZIP_LIB ch_contrib::zlib)
list(APPEND MINIZIP_SRC
${_MINIZIP_SOURCE_DIR}/mz_strm_zlib.c)
list(APPEND MINIZIP_HDR
${_MINIZIP_SOURCE_DIR}/mz_strm_zlib.h)
list(APPEND MINIZIP_DEF "-DHAVE_ZLIB")
endif()
# Check if bzip2 is present
set(MZ_BZIP2 ${ENABLE_BZIP2})
if(MZ_BZIP2)
# Use bzip2 from ClickHouse contrib
list(APPEND MINIZIP_LIB ch_contrib::bzip2)
list(APPEND MINIZIP_SRC
${_MINIZIP_SOURCE_DIR}/mz_strm_bzip.c)
list(APPEND MINIZIP_HDR
${_MINIZIP_SOURCE_DIR}/mz_strm_bzip.h)
list(APPEND MINIZIP_DEF "-DHAVE_BZIP2")
endif()
# Check if liblzma is present
set(MZ_LZMA ON)
if(MZ_LZMA)
# Use liblzma from ClickHouse contrib
list(APPEND MINIZIP_LIB ch_contrib::xz)
list(APPEND MINIZIP_SRC
${_MINIZIP_SOURCE_DIR}/mz_strm_lzma.c)
list(APPEND MINIZIP_HDR
${_MINIZIP_SOURCE_DIR}/mz_strm_lzma.h)
list(APPEND MINIZIP_DEF "-DHAVE_LZMA")
endif()
# Check if zstd is present
set(MZ_ZSTD ON)
if(MZ_ZSTD)
# Use zstd from ClickHouse contrib
list(APPEND MINIZIP_LIB ch_contrib::zstd)
list(APPEND MINIZIP_SRC
${_MINIZIP_SOURCE_DIR}/mz_strm_zstd.c)
list(APPEND MINIZIP_HDR
${_MINIZIP_SOURCE_DIR}/mz_strm_zstd.h)
list(APPEND MINIZIP_DEF "-DHAVE_ZSTD")
endif()
if(NOT MZ_ZLIB AND NOT MZ_ZSTD AND NOT MZ_BZIP2 AND NOT MZ_LZMA)
message(STATUS "Compression not supported due to missing libraries")
list(APPEND MINIZIP_DEF -DMZ_ZIP_NO_DECOMPRESSION)
list(APPEND MINIZIP_DEF -DMZ_ZIP_NO_COMPRESSION)
endif()
# Check to see if openssl installation is present
set(MZ_OPENSSL ${ENABLE_SSL})
if(MZ_OPENSSL)
# Use openssl from ClickHouse contrib
list(APPEND MINIZIP_LIB OpenSSL::SSL OpenSSL::Crypto)
list(APPEND MINIZIP_SRC
${_MINIZIP_SOURCE_DIR}/mz_crypt_openssl.c)
endif()
# Include WinZIP AES encryption
set(MZ_WZAES ${ENABLE_SSL})
if(MZ_WZAES)
list(APPEND MINIZIP_DEF -DHAVE_WZAES)
list(APPEND MINIZIP_SRC
${_MINIZIP_SOURCE_DIR}/mz_strm_wzaes.c)
list(APPEND MINIZIP_HDR
${_MINIZIP_SOURCE_DIR}/mz_strm_wzaes.h)
endif()
# Include traditional PKWare encryption
set(MZ_PKCRYPT ON)
if(MZ_PKCRYPT)
list(APPEND MINIZIP_DEF -DHAVE_PKCRYPT)
list(APPEND MINIZIP_SRC
${_MINIZIP_SOURCE_DIR}/mz_strm_pkcrypt.c)
list(APPEND MINIZIP_HDR
${_MINIZIP_SOURCE_DIR}/mz_strm_pkcrypt.h)
endif()
# Unix specific
if(UNIX)
list(APPEND MINIZIP_SRC
${_MINIZIP_SOURCE_DIR}/mz_os_posix.c
${_MINIZIP_SOURCE_DIR}/mz_strm_os_posix.c)
endif()
# Include compatibility layer
set(MZ_COMPAT ON)
if(MZ_COMPAT)
list(APPEND MINIZIP_SRC
${_MINIZIP_SOURCE_DIR}/mz_compat.c)
list(APPEND MINIZIP_HDR
${_MINIZIP_SOURCE_DIR}/mz_compat.h
zip.h
unzip.h)
list(APPEND MINIZIP_INC "${CMAKE_CURRENT_SOURCE_DIR}")
list(APPEND MINIZIP_PUBLIC_DEF "-DMZ_COMPAT_VERSION=110")
endif()
add_library(_minizip ${MINIZIP_SRC} ${MINIZIP_HDR})
target_include_directories(_minizip PUBLIC ${MINIZIP_INC})
target_compile_definitions(_minizip PUBLIC ${MINIZIP_PUBLIC_DEF})
target_compile_definitions(_minizip PRIVATE ${MINIZIP_DEF})
target_link_libraries(_minizip PRIVATE ${MINIZIP_LIB})
add_library(ch_contrib::minizip ALIAS _minizip)

View File

@ -0,0 +1,13 @@
/* unzip.h -- Compatibility layer shim
part of the minizip-ng project
This program is distributed under the terms of the same license as zlib.
See the accompanying LICENSE file for the full text of the license.
*/
#ifndef MZ_COMPAT_UNZIP
#define MZ_COMPAT_UNZIP
#include "mz_compat.h"
#endif

View File

@ -0,0 +1,13 @@
/* zip.h -- Compatibility layer shim
part of the minizip-ng project
This program is distributed under the terms of the same license as zlib.
See the accompanying LICENSE file for the full text of the license.
*/
#ifndef MZ_COMPAT_ZIP
#define MZ_COMPAT_ZIP
#include "mz_compat.h"
#endif

View File

@ -108,7 +108,13 @@ Examples of configuration for quorum with three nodes can be found in [integrati
ClickHouse Keeper is bundled into the ClickHouse server package, just add configuration of `<keeper_server>` and start ClickHouse server as always. If you want to run standalone ClickHouse Keeper you can start it in a similar way with:
```bash
clickhouse-keeper --config /etc/your_path_to_config/config.xml --daemon
clickhouse-keeper --config /etc/your_path_to_config/config.xml
```
If you don't have the symlink (`clickhouse-keeper`) you can create it or specify `keeper` as argument:
```bash
clickhouse keeper --config /etc/your_path_to_config/config.xml
```
## Four Letter Word Commands {#four-letter-word-commands}

View File

@ -27,7 +27,7 @@ To analyze the `trace_log` system table:
For security reasons, introspection functions are disabled by default.
- Use the `addressToLine`, `addressToSymbol` and `demangle` [introspection functions](../../sql-reference/functions/introspection.md) to get function names and their positions in ClickHouse code. To get a profile for some query, you need to aggregate data from the `trace_log` table. You can aggregate data by individual functions or by the whole stack traces.
- Use the `addressToLine`, `addressToLineWithInlines`, `addressToSymbol` and `demangle` [introspection functions](../../sql-reference/functions/introspection.md) to get function names and their positions in ClickHouse code. To get a profile for some query, you need to aggregate data from the `trace_log` table. You can aggregate data by individual functions or by the whole stack traces.
If you need to visualize `trace_log` info, try [flamegraph](../../interfaces/third-party/gui/#clickhouse-flamegraph) and [speedscope](https://github.com/laplab/clickhouse-speedscope).

View File

@ -2,7 +2,7 @@
Contains stack traces of all server threads. Allows developers to introspect the server state.
To analyze stack frames, use the `addressToLine`, `addressToSymbol` and `demangle` [introspection functions](../../sql-reference/functions/introspection.md).
To analyze stack frames, use the `addressToLine`, `addressToLineWithInlines`, `addressToSymbol` and `demangle` [introspection functions](../../sql-reference/functions/introspection.md).
Columns:

View File

@ -4,7 +4,7 @@ Contains stack traces collected by the sampling query profiler.
ClickHouse creates this table when the [trace_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-trace_log) server configuration section is set. Also the [query_profiler_real_time_period_ns](../../operations/settings/settings.md#query_profiler_real_time_period_ns) and [query_profiler_cpu_time_period_ns](../../operations/settings/settings.md#query_profiler_cpu_time_period_ns) settings should be set.
To analyze logs, use the `addressToLine`, `addressToSymbol` and `demangle` introspection functions.
To analyze logs, use the `addressToLine`, `addressToLineWithInlines`, `addressToSymbol` and `demangle` introspection functions.
Columns:

View File

@ -1,9 +1,9 @@
---
toc_priority: 40
toc_title: UInt8, UInt16, UInt32, UInt64, UInt256, Int8, Int16, Int32, Int64, Int128, Int256
toc_title: UInt8, UInt16, UInt32, UInt64, UInt128, UInt256, Int8, Int16, Int32, Int64, Int128, Int256
---
# UInt8, UInt16, UInt32, UInt64, UInt256, Int8, Int16, Int32, Int64, Int128, Int256 {#uint8-uint16-uint32-uint64-uint256-int8-int16-int32-int64-int128-int256}
# UInt8, UInt16, UInt32, UInt64, UInt128, UInt256, Int8, Int16, Int32, Int64, Int128, Int256
Fixed-length integers, with or without a sign.

View File

@ -113,6 +113,111 @@ trace_source_code_lines: /lib/x86_64-linux-gnu/libpthread-2.27.so
/build/glibc-OTsEL5/glibc-2.27/misc/../sysdeps/unix/sysv/linux/x86_64/clone.S:97
```
## addressToLineWithInlines {#addresstolinewithinlines}
Similar to `addressToLine`, but it will return an Array with all inline functions, and will be much slower as a price.
If you use official ClickHouse packages, you need to install the `clickhouse-common-static-dbg` package.
**Syntax**
``` sql
addressToLineWithInlines(address_of_binary_instruction)
```
**Arguments**
- `address_of_binary_instruction` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Address of instruction in a running process.
**Returned value**
- Array which first element is source code filename and the line number in this file delimited by colon. And from second element, inline functions' source code filename and line number and function name are listed.
- Array with single element which is name of a binary, if the function couldnt find the debug information.
- Empty array, if the address is not valid.
Type: [Array(String)](../../sql-reference/data-types/array.md).
**Example**
Enabling introspection functions:
``` sql
SET allow_introspection_functions=1;
```
Applying the function to address.
```sql
SELECT addressToLineWithInlines(531055181::UInt64);
```
``` text
┌─addressToLineWithInlines(CAST('531055181', 'UInt64'))────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ ['./src/Functions/addressToLineWithInlines.cpp:98','./build_normal_debug/./src/Functions/addressToLineWithInlines.cpp:176:DB::(anonymous namespace)::FunctionAddressToLineWithInlines::implCached(unsigned long) const'] │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```
Applying the function to the whole stack trace:
``` sql
SELECT
ta, addressToLineWithInlines(arrayJoin(trace) as ta)
FROM system.trace_log
WHERE
query_id = '5e173544-2020-45de-b645-5deebe2aae54';
```
The [arrayJoin](../../sql-reference/functions/array-functions.md#array-functions-join) functions will split array to rows.
``` text
┌────────ta─┬─addressToLineWithInlines(arrayJoin(trace))───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ 365497529 │ ['./build_normal_debug/./contrib/libcxx/include/string_view:252'] │
│ 365593602 │ ['./build_normal_debug/./src/Common/Dwarf.cpp:191'] │
│ 365593866 │ ['./build_normal_debug/./src/Common/Dwarf.cpp:0'] │
│ 365592528 │ ['./build_normal_debug/./src/Common/Dwarf.cpp:0'] │
│ 365591003 │ ['./build_normal_debug/./src/Common/Dwarf.cpp:477'] │
│ 365590479 │ ['./build_normal_debug/./src/Common/Dwarf.cpp:442'] │
│ 365590600 │ ['./build_normal_debug/./src/Common/Dwarf.cpp:457'] │
│ 365598941 │ ['./build_normal_debug/./src/Common/Dwarf.cpp:0'] │
│ 365607098 │ ['./build_normal_debug/./src/Common/Dwarf.cpp:0'] │
│ 365590571 │ ['./build_normal_debug/./src/Common/Dwarf.cpp:451'] │
│ 365598941 │ ['./build_normal_debug/./src/Common/Dwarf.cpp:0'] │
│ 365607098 │ ['./build_normal_debug/./src/Common/Dwarf.cpp:0'] │
│ 365590571 │ ['./build_normal_debug/./src/Common/Dwarf.cpp:451'] │
│ 365598941 │ ['./build_normal_debug/./src/Common/Dwarf.cpp:0'] │
│ 365607098 │ ['./build_normal_debug/./src/Common/Dwarf.cpp:0'] │
│ 365590571 │ ['./build_normal_debug/./src/Common/Dwarf.cpp:451'] │
│ 365598941 │ ['./build_normal_debug/./src/Common/Dwarf.cpp:0'] │
│ 365597289 │ ['./build_normal_debug/./src/Common/Dwarf.cpp:807'] │
│ 365599840 │ ['./build_normal_debug/./src/Common/Dwarf.cpp:1118'] │
│ 531058145 │ ['./build_normal_debug/./src/Functions/addressToLineWithInlines.cpp:152'] │
│ 531055181 │ ['./src/Functions/addressToLineWithInlines.cpp:98','./build_normal_debug/./src/Functions/addressToLineWithInlines.cpp:176:DB::(anonymous namespace)::FunctionAddressToLineWithInlines::implCached(unsigned long) const'] │
│ 422333613 │ ['./build_normal_debug/./src/Functions/IFunctionAdaptors.h:21'] │
│ 586866022 │ ['./build_normal_debug/./src/Functions/IFunction.cpp:216'] │
│ 586869053 │ ['./build_normal_debug/./src/Functions/IFunction.cpp:264'] │
│ 586873237 │ ['./build_normal_debug/./src/Functions/IFunction.cpp:334'] │
│ 597901620 │ ['./build_normal_debug/./src/Interpreters/ExpressionActions.cpp:601'] │
│ 597898534 │ ['./build_normal_debug/./src/Interpreters/ExpressionActions.cpp:718'] │
│ 630442912 │ ['./build_normal_debug/./src/Processors/Transforms/ExpressionTransform.cpp:23'] │
│ 546354050 │ ['./build_normal_debug/./src/Processors/ISimpleTransform.h:38'] │
│ 626026993 │ ['./build_normal_debug/./src/Processors/ISimpleTransform.cpp:89'] │
│ 626294022 │ ['./build_normal_debug/./src/Processors/Executors/ExecutionThreadContext.cpp:45'] │
│ 626293730 │ ['./build_normal_debug/./src/Processors/Executors/ExecutionThreadContext.cpp:63'] │
│ 626169525 │ ['./build_normal_debug/./src/Processors/Executors/PipelineExecutor.cpp:213'] │
│ 626170308 │ ['./build_normal_debug/./src/Processors/Executors/PipelineExecutor.cpp:178'] │
│ 626166348 │ ['./build_normal_debug/./src/Processors/Executors/PipelineExecutor.cpp:329'] │
│ 626163461 │ ['./build_normal_debug/./src/Processors/Executors/PipelineExecutor.cpp:84'] │
│ 626323536 │ ['./build_normal_debug/./src/Processors/Executors/PullingAsyncPipelineExecutor.cpp:85'] │
│ 626323277 │ ['./build_normal_debug/./src/Processors/Executors/PullingAsyncPipelineExecutor.cpp:112'] │
│ 626323133 │ ['./build_normal_debug/./contrib/libcxx/include/type_traits:3682'] │
│ 626323041 │ ['./build_normal_debug/./contrib/libcxx/include/tuple:1415'] │
└───────────┴──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```
## addressToSymbol {#addresstosymbol}
Converts virtual memory address inside ClickHouse server process to the symbol from ClickHouse object files.

View File

@ -172,6 +172,7 @@ Hierarchy of privileges:
- `SYSTEM FLUSH LOGS`
- [INTROSPECTION](#grant-introspection)
- `addressToLine`
- `addressToLineWithInlines`
- `addressToSymbol`
- `demangle`
- [SOURCES](#grant-sources)
@ -430,6 +431,7 @@ Allows using [introspection](../../operations/optimizing-performance/sampling-qu
- `INTROSPECTION`. Level: `GROUP`. Aliases: `INTROSPECTION FUNCTIONS`
- `addressToLine`. Level: `GLOBAL`
- `addressToLineWithInlines`. Level: `GLOBAL`
- `addressToSymbol`. Level: `GLOBAL`
- `demangle`. Level: `GLOBAL`

View File

@ -166,6 +166,7 @@ enum class AccessType
M(dictGet, "dictHas, dictGetHierarchy, dictIsIn", DICTIONARY, ALL) /* allows to execute functions dictGet(), dictHas(), dictGetHierarchy(), dictIsIn() */\
\
M(addressToLine, "", GLOBAL, INTROSPECTION) /* allows to execute function addressToLine() */\
M(addressToLineWithInlines, "", GLOBAL, INTROSPECTION) /* allows to execute function addressToLineWithInlines() */\
M(addressToSymbol, "", GLOBAL, INTROSPECTION) /* allows to execute function addressToSymbol() */\
M(demangle, "", GLOBAL, INTROSPECTION) /* allows to execute function demangle() */\
M(INTROSPECTION, "INTROSPECTION FUNCTIONS", GROUP, ALL) /* allows to execute functions addressToLine(), addressToSymbol(), demangle()*/\

View File

@ -79,6 +79,7 @@ set(dbms_sources)
add_headers_and_sources(clickhouse_common_io Common)
add_headers_and_sources(clickhouse_common_io Common/HashTable)
add_headers_and_sources(clickhouse_common_io IO)
add_headers_and_sources(clickhouse_common_io IO/Archives)
add_headers_and_sources(clickhouse_common_io IO/S3)
list (REMOVE_ITEM clickhouse_common_io_sources Common/malloc.cpp Common/new_delete.cpp)
@ -513,6 +514,10 @@ if (TARGET ch_contrib::bzip2)
target_link_libraries (clickhouse_common_io PRIVATE ch_contrib::bzip2)
endif()
if (TARGET ch_contrib::minizip)
target_link_libraries (clickhouse_common_io PRIVATE ch_contrib::minizip)
endif ()
if (TARGET ch_contrib::simdjson)
dbms_target_link_libraries(PRIVATE ch_contrib::simdjson)
endif()

View File

@ -610,6 +610,8 @@
M(639, SNAPPY_COMPRESS_FAILED) \
M(640, NO_HIVEMETASTORE) \
M(641, CANNOT_APPEND_TO_FILE) \
M(642, CANNOT_PACK_ARCHIVE) \
M(643, CANNOT_UNPACK_ARCHIVE) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -17,6 +17,7 @@
#cmakedefine01 USE_YAML_CPP
#cmakedefine01 CLICKHOUSE_SPLIT_BINARY
#cmakedefine01 USE_BZIP2
#cmakedefine01 USE_MINIZIP
#cmakedefine01 USE_SNAPPY
#cmakedefine01 USE_HIVE
#cmakedefine01 USE_ODBC

View File

@ -1,153 +1,58 @@
#if defined(__ELF__) && !defined(__FreeBSD__)
#include <Common/Dwarf.h>
#include <Common/SymbolIndex.h>
#include <Common/HashTable/HashMap.h>
#include <Common/Arena.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <IO/WriteBufferFromArena.h>
#include <IO/WriteHelpers.h>
#include <Access/Common/AccessFlags.h>
#include <Interpreters/Context.h>
#include <mutex>
#include <filesystem>
#include <unordered_map>
#include <Functions/addressToLine.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
namespace
{
class FunctionAddressToLine : public IFunction
class FunctionAddressToLine: public FunctionAddressToLineBase<StringRef, Dwarf::LocationInfoMode::FAST>
{
public:
static constexpr auto name = "addressToLine";
String getName() const override { return name; }
static FunctionPtr create(ContextPtr context)
{
context->checkAccess(AccessType::addressToLine);
return std::make_shared<FunctionAddressToLine>();
}
String getName() const override
protected:
DataTypePtr getDataType() const override
{
return name;
}
size_t getNumberOfArguments() const override
{
return 1;
}
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() != 1)
throw Exception("Function " + getName() + " needs exactly one argument; passed "
+ toString(arguments.size()) + ".", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const auto & type = arguments[0].type;
if (!WhichDataType(type.get()).isUInt64())
throw Exception("The only argument for function " + getName() + " must be UInt64. Found "
+ type->getName() + " instead.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeString>();
}
bool useDefaultImplementationForConstants() const override
ColumnPtr getResultColumn(const typename ColumnVector<UInt64>::Container & data, size_t input_rows_count) const override
{
return true;
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const ColumnPtr & column = arguments[0].column;
const ColumnUInt64 * column_concrete = checkAndGetColumn<ColumnUInt64>(column.get());
if (!column_concrete)
throw Exception("Illegal column " + column->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
const typename ColumnVector<UInt64>::Container & data = column_concrete->getData();
auto result_column = ColumnString::create();
for (size_t i = 0; i < input_rows_count; ++i)
{
StringRef res_str = implCached(data[i]);
result_column->insertData(res_str.data, res_str.size);
}
return result_column;
}
private:
struct Cache
void setResult(StringRef & result, const Dwarf::LocationInfo & location, const std::vector<Dwarf::SymbolizedFrame> &) const override
{
std::mutex mutex;
Arena arena;
using Map = HashMap<uintptr_t, StringRef>;
Map map;
std::unordered_map<std::string, Dwarf> dwarfs;
};
const char * arena_begin = nullptr;
WriteBufferFromArena out(cache.arena, arena_begin);
mutable Cache cache;
writeString(location.file.toString(), out);
writeChar(':', out);
writeIntText(location.line, out);
StringRef impl(uintptr_t addr) const
{
auto symbol_index_ptr = SymbolIndex::instance();
const SymbolIndex & symbol_index = *symbol_index_ptr;
if (const auto * object = symbol_index.findObject(reinterpret_cast<const void *>(addr)))
{
auto dwarf_it = cache.dwarfs.try_emplace(object->name, object->elf).first;
if (!std::filesystem::exists(object->name))
return {};
Dwarf::LocationInfo location;
std::vector<Dwarf::SymbolizedFrame> frames; // NOTE: not used in FAST mode.
if (dwarf_it->second.findAddress(addr - uintptr_t(object->address_begin), location, Dwarf::LocationInfoMode::FAST, frames))
{
const char * arena_begin = nullptr;
WriteBufferFromArena out(cache.arena, arena_begin);
writeString(location.file.toString(), out);
writeChar(':', out);
writeIntText(location.line, out);
return out.complete();
}
else
{
return object->name;
}
}
else
return {};
}
StringRef implCached(uintptr_t addr) const
{
Cache::Map::LookupResult it;
bool inserted;
std::lock_guard lock(cache.mutex);
cache.map.emplace(addr, it, inserted);
if (inserted)
it->getMapped() = impl(addr);
return it->getMapped();
result = out.complete();
}
};

View File

@ -0,0 +1,133 @@
#pragma once
#if defined(__ELF__) && !defined(__FreeBSD__)
#include <Common/Dwarf.h>
#include <Common/SymbolIndex.h>
#include <Common/HashTable/HashMap.h>
#include <Common/Arena.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeArray.h>
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <IO/WriteBufferFromArena.h>
#include <IO/WriteHelpers.h>
#include <Access/Common/AccessFlags.h>
#include <Interpreters/Context.h>
#include <mutex>
#include <filesystem>
#include <unordered_map>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
template <typename ResultT, Dwarf::LocationInfoMode locationInfoMode>
class FunctionAddressToLineBase : public IFunction
{
public:
size_t getNumberOfArguments() const override { return 1; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() != 1)
throw Exception(
"Function " + getName() + " needs exactly one argument; passed " + toString(arguments.size()) + ".",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const auto & type = arguments[0].type;
if (!WhichDataType(type.get()).isUInt64())
throw Exception(
"The only argument for function " + getName() + " must be UInt64. Found " + type->getName() + " instead.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return getDataType();
}
bool useDefaultImplementationForConstants() const override { return true; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const ColumnPtr & column = arguments[0].column;
const ColumnUInt64 * column_concrete = checkAndGetColumn<ColumnUInt64>(column.get());
if (!column_concrete)
throw Exception(
"Illegal column " + column->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
const typename ColumnVector<UInt64>::Container & data = column_concrete->getData();
return getResultColumn(data, input_rows_count);
}
protected:
virtual DataTypePtr getDataType() const = 0;
virtual ColumnPtr getResultColumn(const typename ColumnVector<UInt64>::Container & data, size_t input_rows_count) const = 0;
virtual void
setResult(ResultT & result, const Dwarf::LocationInfo & location, const std::vector<Dwarf::SymbolizedFrame> & frames) const = 0;
struct Cache
{
std::mutex mutex;
Arena arena;
using Map = HashMap<uintptr_t, ResultT>;
Map map;
std::unordered_map<std::string, Dwarf> dwarfs;
};
mutable Cache cache;
ResultT impl(uintptr_t addr) const
{
auto symbol_index_ptr = SymbolIndex::instance();
const SymbolIndex & symbol_index = *symbol_index_ptr;
if (const auto * object = symbol_index.findObject(reinterpret_cast<const void *>(addr)))
{
auto dwarf_it = cache.dwarfs.try_emplace(object->name, object->elf).first;
if (!std::filesystem::exists(object->name))
return {};
Dwarf::LocationInfo location;
std::vector<Dwarf::SymbolizedFrame> frames; // NOTE: not used in FAST mode.
ResultT result;
if (dwarf_it->second.findAddress(addr - uintptr_t(object->address_begin), location, locationInfoMode, frames))
{
setResult(result, location, frames);
return result;
}
else
return {object->name};
}
else
return {};
}
ResultT implCached(uintptr_t addr) const
{
typename Cache::Map::LookupResult it;
bool inserted;
std::lock_guard lock(cache.mutex);
cache.map.emplace(addr, it, inserted);
if (inserted)
it->getMapped() = impl(addr);
return it->getMapped();
}
};
}
#endif

View File

@ -0,0 +1,99 @@
#if defined(__ELF__) && !defined(__FreeBSD__)
#include <Common/Dwarf.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeArray.h>
#include <Functions/FunctionFactory.h>
#include <IO/WriteBufferFromArena.h>
#include <IO/WriteHelpers.h>
#include <Access/Common/AccessFlags.h>
#include <Functions/addressToLine.h>
#include <vector>
namespace DB
{
namespace
{
class FunctionAddressToLineWithInlines: public FunctionAddressToLineBase<StringRefs, Dwarf::LocationInfoMode::FULL_WITH_INLINE>
{
public:
static constexpr auto name = "addressToLineWithInlines";
String getName() const override { return name; }
static FunctionPtr create(ContextPtr context)
{
context->checkAccess(AccessType::addressToLineWithInlines);
return std::make_shared<FunctionAddressToLineWithInlines>();
}
protected:
DataTypePtr getDataType() const override
{
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>());
}
ColumnPtr getResultColumn(const typename ColumnVector<UInt64>::Container & data, size_t input_rows_count) const override
{
auto result_column = ColumnArray::create(ColumnString::create());
ColumnString & result_strings = typeid_cast<ColumnString &>(result_column->getData());
ColumnArray::Offsets & result_offsets = result_column->getOffsets();
ColumnArray::Offset current_offset = 0;
for (size_t i = 0; i < input_rows_count; ++i)
{
StringRefs res = implCached(data[i]);
for (auto & r : res)
result_strings.insertData(r.data, r.size);
current_offset += res.size();
result_offsets.push_back(current_offset);
}
return result_column;
}
void setResult(StringRefs & result, const Dwarf::LocationInfo & location, const std::vector<Dwarf::SymbolizedFrame> & inline_frames) const override
{
appendLocationToResult(result, location, nullptr);
for (const auto & inline_frame : inline_frames)
appendLocationToResult(result, inline_frame.location, &inline_frame);
}
private:
inline ALWAYS_INLINE void appendLocationToResult(StringRefs & result, const Dwarf::LocationInfo & location, const Dwarf::SymbolizedFrame * frame) const
{
const char * arena_begin = nullptr;
WriteBufferFromArena out(cache.arena, arena_begin);
writeString(location.file.toString(), out);
writeChar(':', out);
writeIntText(location.line, out);
if (frame)
{
writeChar(':', out);
int status = 0;
writeString(demangle(frame->name, status), out);
}
result.emplace_back(out.complete());
}
};
}
void registerFunctionAddressToLineWithInlines(FunctionFactory & factory)
{
factory.registerFunction<FunctionAddressToLineWithInlines>();
}
}
#endif

View File

@ -6,6 +6,7 @@ class FunctionFactory;
#if defined(OS_LINUX)
void registerFunctionAddressToSymbol(FunctionFactory & factory);
void registerFunctionAddressToLine(FunctionFactory & factory);
void registerFunctionAddressToLineWithInlines(FunctionFactory & factory);
#endif
void registerFunctionDemangle(FunctionFactory & factory);
@ -17,6 +18,7 @@ void registerFunctionsIntrospection(FunctionFactory & factory)
#if defined(OS_LINUX)
registerFunctionAddressToSymbol(factory);
registerFunctionAddressToLine(factory);
registerFunctionAddressToLineWithInlines(factory);
#endif
registerFunctionDemangle(factory);
registerFunctionTrap(factory);

View File

@ -0,0 +1,60 @@
#pragma once
#include <base/types.h>
#include <memory>
namespace DB
{
class ReadBuffer;
class ReadBufferFromFileBase;
class SeekableReadBuffer;
/// Interface for reading an archive.
class IArchiveReader : public std::enable_shared_from_this<IArchiveReader>
{
public:
virtual ~IArchiveReader() = default;
/// Returns true if there is a specified file in the archive.
virtual bool fileExists(const String & filename) = 0;
struct FileInfo
{
UInt64 uncompressed_size;
UInt64 compressed_size;
int compression_method;
bool is_encrypted;
};
/// Returns the information about a file stored in the archive.
virtual FileInfo getFileInfo(const String & filename) = 0;
class FileEnumerator
{
public:
virtual ~FileEnumerator() = default;
virtual const String & getFileName() const = 0;
virtual const FileInfo & getFileInfo() const = 0;
virtual bool nextFile() = 0;
};
/// Starts enumerating files in the archive.
virtual std::unique_ptr<FileEnumerator> firstFile() = 0;
/// Starts reading a file from the archive. The function returns a read buffer,
/// you can read that buffer to extract uncompressed data from the archive.
/// Several read buffers can be used at the same time in parallel.
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(const String & filename) = 0;
/// It's possible to convert a file enumerator to a read buffer and vice versa.
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(std::unique_ptr<FileEnumerator> enumerator) = 0;
virtual std::unique_ptr<FileEnumerator> nextFile(std::unique_ptr<ReadBuffer> read_buffer) = 0;
/// Sets password used to decrypt files in the archive.
virtual void setPassword(const String & /* password */) {}
using ReadArchiveFunction = std::function<std::unique_ptr<SeekableReadBuffer>()>;
};
}

View File

@ -0,0 +1,38 @@
#pragma once
#include <base/types.h>
#include <memory>
namespace DB
{
class WriteBufferFromFileBase;
/// Interface for writing an archive.
class IArchiveWriter : public std::enable_shared_from_this<IArchiveWriter>
{
public:
/// Destructors finalizes writing the archive.
virtual ~IArchiveWriter() = default;
/// Starts writing a file to the archive. The function returns a write buffer,
/// any data written to that buffer will be compressed and then put to the archive.
/// You can keep only one such buffer at a time, a buffer returned by previous call
/// of the function `writeFile()` should be destroyed before next call of `writeFile()`.
virtual std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & filename) = 0;
/// Returns true if there is an active instance of WriteBuffer returned by writeFile().
/// This function should be used mostly for debugging purposes.
virtual bool isWritingFile() const = 0;
static constexpr const int kDefaultCompressionLevel = -1;
/// Sets compression method and level.
/// Changing them will affect next file in the archive.
virtual void setCompression(int /* compression_method */, int /* compression_level */ = kDefaultCompressionLevel) {}
/// Sets password. If the password is not empty it will enable encryption in the archive.
virtual void setPassword(const String & /* password */) {}
};
}

View File

@ -0,0 +1,563 @@
#include <IO/Archives/ZipArchiveReader.h>
#if USE_MINIZIP
#include <IO/ReadBufferFromFileBase.h>
#include <Common/quoteString.h>
#include <unzip.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_UNPACK_ARCHIVE;
extern const int LOGICAL_ERROR;
extern const int SEEK_POSITION_OUT_OF_BOUND;
}
using RawHandle = unzFile;
/// Holds a raw handle, calls acquireRawHandle() in the constructor and releaseRawHandle() in the destructor.
class ZipArchiveReader::HandleHolder
{
public:
HandleHolder() = default;
explicit HandleHolder(const std::shared_ptr<ZipArchiveReader> & reader_) : reader(reader_), raw_handle(reader->acquireRawHandle()) { }
~HandleHolder()
{
if (raw_handle)
{
try
{
closeFile();
}
catch (...)
{
tryLogCurrentException("ZipArchiveReader");
}
reader->releaseRawHandle(raw_handle);
}
}
HandleHolder(HandleHolder && src)
{
*this = std::move(src);
}
HandleHolder & operator =(HandleHolder && src)
{
reader = std::exchange(src.reader, nullptr);
raw_handle = std::exchange(src.raw_handle, nullptr);
file_name = std::exchange(src.file_name, {});
file_info = std::exchange(src.file_info, {});
return *this;
}
RawHandle getRawHandle() const { return raw_handle; }
std::shared_ptr<ZipArchiveReader> getReader() const { return reader; }
void locateFile(const String & file_name_)
{
resetFileInfo();
bool case_sensitive = true;
int err = unzLocateFile(raw_handle, file_name_.c_str(), reinterpret_cast<unzFileNameComparer>(static_cast<size_t>(case_sensitive)));
if (err == UNZ_END_OF_LIST_OF_FILE)
showError("File " + quoteString(file_name_) + " not found");
file_name = file_name_;
}
bool tryLocateFile(const String & file_name_)
{
resetFileInfo();
bool case_sensitive = true;
int err = unzLocateFile(raw_handle, file_name_.c_str(), reinterpret_cast<unzFileNameComparer>(static_cast<size_t>(case_sensitive)));
if (err == UNZ_END_OF_LIST_OF_FILE)
return false;
checkResult(err);
file_name = file_name_;
return true;
}
bool firstFile()
{
resetFileInfo();
int err = unzGoToFirstFile(raw_handle);
if (err == UNZ_END_OF_LIST_OF_FILE)
return false;
checkResult(err);
return true;
}
bool nextFile()
{
resetFileInfo();
int err = unzGoToNextFile(raw_handle);
if (err == UNZ_END_OF_LIST_OF_FILE)
return false;
checkResult(err);
return true;
}
const String & getFileName() const
{
if (!file_name)
retrieveFileInfo();
return *file_name;
}
const FileInfo & getFileInfo() const
{
if (!file_info)
retrieveFileInfo();
return *file_info;
}
void closeFile()
{
int err = unzCloseCurrentFile(raw_handle);
/// If err == UNZ_PARAMERROR the file is already closed.
if (err != UNZ_PARAMERROR)
checkResult(err);
}
void checkResult(int code) const { reader->checkResult(code); }
[[noreturn]] void showError(const String & message) const { reader->showError(message); }
private:
void retrieveFileInfo() const
{
if (file_name && file_info)
return;
unz_file_info64 finfo;
int err = unzGetCurrentFileInfo64(raw_handle, &finfo, nullptr, 0, nullptr, 0, nullptr, 0);
if (err == UNZ_PARAMERROR)
showError("No current file");
checkResult(err);
if (!file_info)
{
file_info.emplace();
file_info->uncompressed_size = finfo.uncompressed_size;
file_info->compressed_size = finfo.compressed_size;
file_info->compression_method = finfo.compression_method;
file_info->is_encrypted = (finfo.flag & MZ_ZIP_FLAG_ENCRYPTED);
}
if (!file_name)
{
file_name.emplace();
file_name->resize(finfo.size_filename);
checkResult(unzGetCurrentFileInfo64(raw_handle, nullptr, file_name->data(), finfo.size_filename, nullptr, 0, nullptr, 0));
}
}
void resetFileInfo()
{
file_info.reset();
file_name.reset();
}
std::shared_ptr<ZipArchiveReader> reader;
RawHandle raw_handle = nullptr;
mutable std::optional<String> file_name;
mutable std::optional<FileInfo> file_info;
};
/// This class represents a ReadBuffer actually returned by readFile().
class ZipArchiveReader::ReadBufferFromZipArchive : public ReadBufferFromFileBase
{
public:
explicit ReadBufferFromZipArchive(HandleHolder && handle_)
: ReadBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0)
, handle(std::move(handle_))
{
const auto & file_info = handle.getFileInfo();
checkCompressionMethodIsEnabled(static_cast<CompressionMethod>(file_info.compression_method));
const char * password_cstr = nullptr;
if (file_info.is_encrypted)
{
const auto & password_str = handle.getReader()->password;
if (password_str.empty())
showError("Password is required");
password_cstr = password_str.c_str();
checkEncryptionIsEnabled();
}
RawHandle raw_handle = handle.getRawHandle();
int err = unzOpenCurrentFilePassword(raw_handle, password_cstr);
if (err == MZ_PASSWORD_ERROR)
showError("Wrong password");
checkResult(err);
}
off_t seek(off_t off, int whence) override
{
off_t current_pos = getPosition();
off_t new_pos;
if (whence == SEEK_SET)
new_pos = off;
else if (whence == SEEK_CUR)
new_pos = off + current_pos;
else
throw Exception("Only SEEK_SET and SEEK_CUR seek modes allowed.", ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
if (new_pos == current_pos)
return current_pos; /// The position is the same.
if (new_pos < 0)
throw Exception("Seek position is out of bound", ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
off_t working_buffer_start_pos = current_pos - offset();
off_t working_buffer_end_pos = current_pos + available();
if ((working_buffer_start_pos <= new_pos) && (new_pos <= working_buffer_end_pos))
{
/// The new position is still inside the buffer.
position() += new_pos - current_pos;
return new_pos;
}
RawHandle raw_handle = handle.getRawHandle();
/// Check that the new position is now beyond the end of the file.
const auto & file_info = handle.getFileInfo();
if (new_pos > static_cast<off_t>(file_info.uncompressed_size))
throw Exception("Seek position is out of bound", ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
if (file_info.compression_method == static_cast<int>(CompressionMethod::kStore))
{
/// unzSeek64() works only for non-compressed files.
checkResult(unzSeek64(raw_handle, off, whence));
return unzTell64(raw_handle);
}
/// As a last try we go slow way, we're going to simply ignore all data before the new position.
if (new_pos < current_pos)
{
checkResult(unzCloseCurrentFile(raw_handle));
checkResult(unzOpenCurrentFile(raw_handle));
current_pos = 0;
}
ignore(new_pos - current_pos);
return new_pos;
}
off_t getPosition() override
{
RawHandle raw_handle = handle.getRawHandle();
return unzTell64(raw_handle) - available();
}
String getFileName() const override { return handle.getFileName(); }
/// Releases owned handle to pass it to an enumerator.
HandleHolder releaseHandle() &&
{
handle.closeFile();
return std::move(handle);
}
private:
bool nextImpl() override
{
RawHandle raw_handle = handle.getRawHandle();
auto bytes_read = unzReadCurrentFile(raw_handle, internal_buffer.begin(), internal_buffer.size());
if (bytes_read < 0)
checkResult(bytes_read);
if (!bytes_read)
return false;
working_buffer = internal_buffer;
working_buffer.resize(bytes_read);
return true;
}
void checkResult(int code) const { handle.checkResult(code); }
[[noreturn]] void showError(const String & message) const { handle.showError(message); }
HandleHolder handle;
};
class ZipArchiveReader::FileEnumeratorImpl : public FileEnumerator
{
public:
explicit FileEnumeratorImpl(HandleHolder && handle_) : handle(std::move(handle_)) {}
const String & getFileName() const override { return handle.getFileName(); }
const FileInfo & getFileInfo() const override { return handle.getFileInfo(); }
bool nextFile() override { return handle.nextFile(); }
/// Releases owned handle to pass it to a read buffer.
HandleHolder releaseHandle() && { return std::move(handle); }
private:
HandleHolder handle;
};
namespace
{
/// Provides a set of functions allowing the minizip library to read its input
/// from a SeekableReadBuffer instead of an ordinary file in the local filesystem.
class StreamFromReadBuffer
{
public:
static RawHandle open(std::unique_ptr<SeekableReadBuffer> archive_read_buffer, UInt64 archive_size)
{
StreamFromReadBuffer::Opaque opaque{std::move(archive_read_buffer), archive_size};
zlib_filefunc64_def func_def;
func_def.zopen64_file = &StreamFromReadBuffer::openFileFunc;
func_def.zclose_file = &StreamFromReadBuffer::closeFileFunc;
func_def.zread_file = &StreamFromReadBuffer::readFileFunc;
func_def.zwrite_file = &StreamFromReadBuffer::writeFileFunc;
func_def.zseek64_file = &StreamFromReadBuffer::seekFunc;
func_def.ztell64_file = &StreamFromReadBuffer::tellFunc;
func_def.zerror_file = &StreamFromReadBuffer::testErrorFunc;
func_def.opaque = &opaque;
return unzOpen2_64(/* path= */ nullptr,
&func_def);
}
private:
std::unique_ptr<SeekableReadBuffer> read_buffer;
UInt64 start_offset = 0;
UInt64 total_size = 0;
bool at_end = false;
struct Opaque
{
std::unique_ptr<SeekableReadBuffer> read_buffer;
UInt64 total_size = 0;
};
static void * openFileFunc(void * opaque, const void *, int)
{
auto & opq = *reinterpret_cast<Opaque *>(opaque);
return new StreamFromReadBuffer(std::move(opq.read_buffer), opq.total_size);
}
StreamFromReadBuffer(std::unique_ptr<SeekableReadBuffer> read_buffer_, UInt64 total_size_)
: read_buffer(std::move(read_buffer_)), start_offset(read_buffer->getPosition()), total_size(total_size_) {}
static int closeFileFunc(void *, void * stream)
{
delete reinterpret_cast<StreamFromReadBuffer *>(stream);
return ZIP_OK;
}
static StreamFromReadBuffer & get(void * ptr)
{
return *reinterpret_cast<StreamFromReadBuffer *>(ptr);
}
static int testErrorFunc(void *, void *)
{
return ZIP_OK;
}
static unsigned long readFileFunc(void *, void * stream, void * buf, unsigned long size) // NOLINT(google-runtime-int)
{
auto & strm = get(stream);
if (strm.at_end)
return 0;
auto read_bytes = strm.read_buffer->read(reinterpret_cast<char *>(buf), size);
return read_bytes;
}
static ZPOS64_T tellFunc(void *, void * stream)
{
auto & strm = get(stream);
if (strm.at_end)
return strm.total_size;
auto pos = strm.read_buffer->getPosition() - strm.start_offset;
return pos;
}
static long seekFunc(void *, void * stream, ZPOS64_T offset, int origin) // NOLINT(google-runtime-int)
{
auto & strm = get(stream);
if (origin == SEEK_END)
{
/// Our implementations of SeekableReadBuffer don't support SEEK_END,
/// but the minizip library needs it, so we have to simulate it here.
strm.at_end = true;
return ZIP_OK;
}
strm.at_end = false;
if (origin == SEEK_SET)
offset += strm.start_offset;
strm.read_buffer->seek(offset, origin);
return ZIP_OK;
}
static unsigned long writeFileFunc(void *, void *, const void *, unsigned long) // NOLINT(google-runtime-int)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "StreamFromReadBuffer::writeFile must not be called");
}
};
}
ZipArchiveReader::ZipArchiveReader(const String & path_to_archive_)
: path_to_archive(path_to_archive_)
{
init();
}
ZipArchiveReader::ZipArchiveReader(
const String & path_to_archive_, const ReadArchiveFunction & archive_read_function_, UInt64 archive_size_)
: path_to_archive(path_to_archive_), archive_read_function(archive_read_function_), archive_size(archive_size_)
{
init();
}
void ZipArchiveReader::init()
{
/// Prepare the first handle in `free_handles` and check that the archive can be read.
releaseRawHandle(acquireRawHandle());
}
ZipArchiveReader::~ZipArchiveReader()
{
/// Close all `free_handles`.
for (RawHandle free_handle : free_handles)
{
try
{
checkResult(unzClose(free_handle));
}
catch (...)
{
tryLogCurrentException("ZipArchiveReader");
}
}
}
bool ZipArchiveReader::fileExists(const String & filename)
{
return acquireHandle().tryLocateFile(filename);
}
ZipArchiveReader::FileInfo ZipArchiveReader::getFileInfo(const String & filename)
{
auto handle = acquireHandle();
handle.locateFile(filename);
return handle.getFileInfo();
}
std::unique_ptr<ZipArchiveReader::FileEnumerator> ZipArchiveReader::firstFile()
{
auto handle = acquireHandle();
if (!handle.firstFile())
return nullptr;
return std::make_unique<FileEnumeratorImpl>(std::move(handle));
}
std::unique_ptr<ReadBufferFromFileBase> ZipArchiveReader::readFile(const String & filename)
{
auto handle = acquireHandle();
handle.locateFile(filename);
return std::make_unique<ReadBufferFromZipArchive>(std::move(handle));
}
std::unique_ptr<ReadBufferFromFileBase> ZipArchiveReader::readFile(std::unique_ptr<FileEnumerator> enumerator)
{
if (!dynamic_cast<FileEnumeratorImpl *>(enumerator.get()))
throw Exception("Wrong enumerator passed to readFile()", ErrorCodes::LOGICAL_ERROR);
auto enumerator_impl = std::unique_ptr<FileEnumeratorImpl>(static_cast<FileEnumeratorImpl *>(enumerator.release()));
auto handle = std::move(*enumerator_impl).releaseHandle();
return std::make_unique<ReadBufferFromZipArchive>(std::move(handle));
}
std::unique_ptr<ZipArchiveReader::FileEnumerator> ZipArchiveReader::nextFile(std::unique_ptr<ReadBuffer> read_buffer)
{
if (!dynamic_cast<ReadBufferFromZipArchive *>(read_buffer.get()))
throw Exception("Wrong ReadBuffer passed to nextFile()", ErrorCodes::LOGICAL_ERROR);
auto read_buffer_from_zip = std::unique_ptr<ReadBufferFromZipArchive>(static_cast<ReadBufferFromZipArchive *>(read_buffer.release()));
auto handle = std::move(*read_buffer_from_zip).releaseHandle();
if (!handle.nextFile())
return nullptr;
return std::make_unique<FileEnumeratorImpl>(std::move(handle));
}
void ZipArchiveReader::setPassword(const String & password_)
{
std::lock_guard lock{mutex};
password = password_;
}
ZipArchiveReader::HandleHolder ZipArchiveReader::acquireHandle()
{
return HandleHolder{std::static_pointer_cast<ZipArchiveReader>(shared_from_this())};
}
ZipArchiveReader::RawHandle ZipArchiveReader::acquireRawHandle()
{
std::lock_guard lock{mutex};
if (!free_handles.empty())
{
RawHandle free_handle = free_handles.back();
free_handles.pop_back();
return free_handle;
}
RawHandle new_handle = nullptr;
if (archive_read_function)
new_handle = StreamFromReadBuffer::open(archive_read_function(), archive_size);
else
new_handle = unzOpen64(path_to_archive.c_str());
if (!new_handle)
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't open zip archive {}", quoteString(path_to_archive));
return new_handle;
}
void ZipArchiveReader::releaseRawHandle(RawHandle handle_)
{
if (!handle_)
return;
std::lock_guard lock{mutex};
free_handles.push_back(handle_);
}
void ZipArchiveReader::checkResult(int code) const
{
if (code >= UNZ_OK)
return;
String message = "Code= ";
switch (code)
{
case UNZ_OK: return;
case UNZ_ERRNO: message += "ERRNO, errno= " + String{strerror(errno)}; break;
case UNZ_PARAMERROR: message += "PARAMERROR"; break;
case UNZ_BADZIPFILE: message += "BADZIPFILE"; break;
case UNZ_INTERNALERROR: message += "INTERNALERROR"; break;
case UNZ_CRCERROR: message += "CRCERROR"; break;
case UNZ_BADPASSWORD: message += "BADPASSWORD"; break;
default: message += std::to_string(code); break;
}
showError(message);
}
void ZipArchiveReader::showError(const String & message) const
{
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack zip archive {}: {}", quoteString(path_to_archive), message);
}
}
#endif

View File

@ -0,0 +1,86 @@
#pragma once
#include <Common/config.h>
#if USE_MINIZIP
#include <IO/Archives/IArchiveReader.h>
#include <IO/Archives/ZipArchiveWriter.h>
#include <base/shared_ptr_helper.h>
#include <mutex>
#include <vector>
namespace DB
{
class ReadBuffer;
class ReadBufferFromFileBase;
class SeekableReadBuffer;
/// Implementation of IArchiveReader for reading zip archives.
class ZipArchiveReader : public shared_ptr_helper<ZipArchiveReader>, public IArchiveReader
{
public:
using CompressionMethod = ZipArchiveWriter::CompressionMethod;
~ZipArchiveReader() override;
/// Returns true if there is a specified file in the archive.
bool fileExists(const String & filename) override;
/// Returns the information about a file stored in the archive.
FileInfo getFileInfo(const String & filename) override;
/// Starts enumerating files in the archive.
std::unique_ptr<FileEnumerator> firstFile() override;
/// Starts reading a file from the archive. The function returns a read buffer,
/// you can read that buffer to extract uncompressed data from the archive.
/// Several read buffers can be used at the same time in parallel.
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & filename) override;
/// It's possible to convert a file enumerator to a read buffer and vice versa.
std::unique_ptr<ReadBufferFromFileBase> readFile(std::unique_ptr<FileEnumerator> enumerator) override;
std::unique_ptr<FileEnumerator> nextFile(std::unique_ptr<ReadBuffer> read_buffer) override;
/// Sets password used to decrypt the contents of the files in the archive.
void setPassword(const String & password_) override;
/// Utility functions.
static CompressionMethod parseCompressionMethod(const String & str) { return ZipArchiveWriter::parseCompressionMethod(str); }
static void checkCompressionMethodIsEnabled(CompressionMethod method) { ZipArchiveWriter::checkCompressionMethodIsEnabled(method); }
static void checkEncryptionIsEnabled() { ZipArchiveWriter::checkEncryptionIsEnabled(); }
private:
/// Constructs an archive's reader that will read from a file in the local filesystem.
explicit ZipArchiveReader(const String & path_to_archive_);
/// Constructs an archive's reader that will read by making a read buffer by using
/// a specified function.
ZipArchiveReader(const String & path_to_archive_, const ReadArchiveFunction & archive_read_function_, UInt64 archive_size_);
friend struct shared_ptr_helper<ZipArchiveReader>;
class ReadBufferFromZipArchive;
class FileEnumeratorImpl;
class HandleHolder;
using RawHandle = void *;
void init();
HandleHolder acquireHandle();
RawHandle acquireRawHandle();
void releaseRawHandle(RawHandle handle_);
void checkResult(int code) const;
[[noreturn]] void showError(const String & message) const;
const String path_to_archive;
const ReadArchiveFunction archive_read_function;
const UInt64 archive_size = 0;
String password;
std::vector<RawHandle> free_handles;
mutable std::mutex mutex;
};
}
#endif

View File

@ -0,0 +1,385 @@
#include <IO/Archives/ZipArchiveWriter.h>
#if USE_MINIZIP
#include <IO/WriteBufferFromFileBase.h>
#include <Common/quoteString.h>
#include <zip.h>
#include <boost/algorithm/string/predicate.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PACK_ARCHIVE;
extern const int SUPPORT_IS_DISABLED;
extern const int LOGICAL_ERROR;
}
using RawHandle = zipFile;
/// Holds a raw handle, calls acquireRawHandle() in the constructor and releaseRawHandle() in the destructor.
class ZipArchiveWriter::HandleHolder
{
public:
HandleHolder() = default;
explicit HandleHolder(const std::shared_ptr<ZipArchiveWriter> & writer_) : writer(writer_), raw_handle(writer->acquireRawHandle()) { }
~HandleHolder()
{
if (raw_handle)
{
try
{
int err = zipCloseFileInZip(raw_handle);
/// If err == ZIP_PARAMERROR the file is already closed.
if (err != ZIP_PARAMERROR)
checkResult(err);
}
catch (...)
{
tryLogCurrentException("ZipArchiveWriter");
}
writer->releaseRawHandle(raw_handle);
}
}
HandleHolder(HandleHolder && src)
{
*this = std::move(src);
}
HandleHolder & operator =(HandleHolder && src)
{
writer = std::exchange(src.writer, nullptr);
raw_handle = std::exchange(src.raw_handle, nullptr);
return *this;
}
RawHandle getRawHandle() const { return raw_handle; }
std::shared_ptr<ZipArchiveWriter> getWriter() const { return writer; }
void checkResult(int code) const { writer->checkResult(code); }
private:
std::shared_ptr<ZipArchiveWriter> writer;
RawHandle raw_handle = nullptr;
};
/// This class represents a WriteBuffer actually returned by writeFile().
class ZipArchiveWriter::WriteBufferFromZipArchive : public WriteBufferFromFileBase
{
public:
WriteBufferFromZipArchive(HandleHolder && handle_, const String & filename_)
: WriteBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0)
, handle(std::move(handle_))
, filename(filename_)
{
auto compress_method = handle.getWriter()->compression_method;
auto compress_level = handle.getWriter()->compression_level;
checkCompressionMethodIsEnabled(static_cast<CompressionMethod>(compress_method));
const char * password_cstr = nullptr;
const String & password_str = handle.getWriter()->password;
if (!password_str.empty())
{
checkEncryptionIsEnabled();
password_cstr = password_str.c_str();
}
RawHandle raw_handle = handle.getRawHandle();
checkResult(zipOpenNewFileInZip3_64(
raw_handle,
filename_.c_str(),
/* zipfi= */ nullptr,
/* extrafield_local= */ nullptr,
/* size_extrafield_local= */ 0,
/* extrafield_global= */ nullptr,
/* size_extrafield_global= */ 0,
/* comment= */ nullptr,
compress_method,
compress_level,
/* raw= */ false,
/* windowBits= */ 0,
/* memLevel= */ 0,
/* strategy= */ 0,
password_cstr,
/* crc_for_crypting= */ 0,
/* zip64= */ true));
}
~WriteBufferFromZipArchive() override
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException("ZipArchiveWriter");
}
}
void sync() override { next(); }
std::string getFileName() const override { return filename; }
private:
void nextImpl() override
{
if (!offset())
return;
RawHandle raw_handle = handle.getRawHandle();
checkResult(zipWriteInFileInZip(raw_handle, working_buffer.begin(), offset()));
}
void checkResult(int code) const { handle.checkResult(code); }
HandleHolder handle;
String filename;
};
namespace
{
/// Provides a set of functions allowing the minizip library to write its output
/// to a WriteBuffer instead of an ordinary file in the local filesystem.
class StreamFromWriteBuffer
{
public:
static RawHandle open(std::unique_ptr<WriteBuffer> archive_write_buffer)
{
Opaque opaque{std::move(archive_write_buffer)};
zlib_filefunc64_def func_def;
func_def.zopen64_file = &StreamFromWriteBuffer::openFileFunc;
func_def.zclose_file = &StreamFromWriteBuffer::closeFileFunc;
func_def.zread_file = &StreamFromWriteBuffer::readFileFunc;
func_def.zwrite_file = &StreamFromWriteBuffer::writeFileFunc;
func_def.zseek64_file = &StreamFromWriteBuffer::seekFunc;
func_def.ztell64_file = &StreamFromWriteBuffer::tellFunc;
func_def.zerror_file = &StreamFromWriteBuffer::testErrorFunc;
func_def.opaque = &opaque;
return zipOpen2_64(
/* path= */ nullptr,
/* append= */ false,
/* globalcomment= */ nullptr,
&func_def);
}
private:
std::unique_ptr<WriteBuffer> write_buffer;
UInt64 start_offset = 0;
struct Opaque
{
std::unique_ptr<WriteBuffer> write_buffer;
};
static void * openFileFunc(void * opaque, const void *, int)
{
Opaque & opq = *reinterpret_cast<Opaque *>(opaque);
return new StreamFromWriteBuffer(std::move(opq.write_buffer));
}
explicit StreamFromWriteBuffer(std::unique_ptr<WriteBuffer> write_buffer_)
: write_buffer(std::move(write_buffer_)), start_offset(write_buffer->count()) {}
static int closeFileFunc(void *, void * stream)
{
delete reinterpret_cast<StreamFromWriteBuffer *>(stream);
return ZIP_OK;
}
static StreamFromWriteBuffer & get(void * ptr)
{
return *reinterpret_cast<StreamFromWriteBuffer *>(ptr);
}
static unsigned long writeFileFunc(void *, void * stream, const void * buf, unsigned long size) // NOLINT(google-runtime-int)
{
auto & strm = get(stream);
strm.write_buffer->write(reinterpret_cast<const char *>(buf), size);
return size;
}
static int testErrorFunc(void *, void *)
{
return ZIP_OK;
}
static ZPOS64_T tellFunc(void *, void * stream)
{
auto & strm = get(stream);
auto pos = strm.write_buffer->count() - strm.start_offset;
return pos;
}
static long seekFunc(void *, void *, ZPOS64_T, int) // NOLINT(google-runtime-int)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "StreamFromWriteBuffer::seek must not be called");
}
static unsigned long readFileFunc(void *, void *, void *, unsigned long) // NOLINT(google-runtime-int)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "StreamFromWriteBuffer::readFile must not be called");
}
};
}
ZipArchiveWriter::ZipArchiveWriter(const String & path_to_archive_)
: ZipArchiveWriter(path_to_archive_, nullptr)
{
}
ZipArchiveWriter::ZipArchiveWriter(const String & path_to_archive_, std::unique_ptr<WriteBuffer> archive_write_buffer_)
: path_to_archive(path_to_archive_)
{
if (archive_write_buffer_)
handle = StreamFromWriteBuffer::open(std::move(archive_write_buffer_));
else
handle = zipOpen64(path_to_archive.c_str(), /* append= */ false);
if (!handle)
throw Exception(ErrorCodes::CANNOT_PACK_ARCHIVE, "Couldn't create zip archive {}", quoteString(path_to_archive));
}
ZipArchiveWriter::~ZipArchiveWriter()
{
if (handle)
{
try
{
checkResult(zipClose(handle, /* global_comment= */ nullptr));
}
catch (...)
{
tryLogCurrentException("ZipArchiveWriter");
}
}
}
std::unique_ptr<WriteBufferFromFileBase> ZipArchiveWriter::writeFile(const String & filename)
{
return std::make_unique<WriteBufferFromZipArchive>(acquireHandle(), filename);
}
bool ZipArchiveWriter::isWritingFile() const
{
std::lock_guard lock{mutex};
return !handle;
}
void ZipArchiveWriter::setCompression(int compression_method_, int compression_level_)
{
std::lock_guard lock{mutex};
compression_method = compression_method_;
compression_level = compression_level_;
}
void ZipArchiveWriter::setPassword(const String & password_)
{
std::lock_guard lock{mutex};
password = password_;
}
ZipArchiveWriter::CompressionMethod ZipArchiveWriter::parseCompressionMethod(const String & str)
{
if (str.empty())
return CompressionMethod::kDeflate; /// Default compression method is DEFLATE.
else if (boost::iequals(str, "store"))
return CompressionMethod::kStore;
else if (boost::iequals(str, "deflate"))
return CompressionMethod::kDeflate;
else if (boost::iequals(str, "bzip2"))
return CompressionMethod::kBzip2;
else if (boost::iequals(str, "lzma"))
return CompressionMethod::kLzma;
else if (boost::iequals(str, "zstd"))
return CompressionMethod::kZstd;
else if (boost::iequals(str, "xz"))
return CompressionMethod::kXz;
else
throw Exception(ErrorCodes::CANNOT_PACK_ARCHIVE, "Unknown compression method specified for a zip archive: {}", str);
}
/// Checks that a passed compression method can be used.
void ZipArchiveWriter::checkCompressionMethodIsEnabled(CompressionMethod method)
{
switch (method)
{
case CompressionMethod::kStore: [[fallthrough]];
case CompressionMethod::kDeflate:
case CompressionMethod::kLzma:
case CompressionMethod::kXz:
case CompressionMethod::kZstd:
return;
case CompressionMethod::kBzip2:
{
#if USE_BZIP2
return;
#else
throw Exception("BZIP2 compression method is disabled", ErrorCodes::SUPPORT_IS_DISABLED);
#endif
}
}
throw Exception(ErrorCodes::CANNOT_PACK_ARCHIVE, "Unknown compression method specified for a zip archive: {}", static_cast<int>(method));
}
/// Checks that encryption is enabled.
void ZipArchiveWriter::checkEncryptionIsEnabled()
{
#if !USE_SSL
throw Exception("Encryption in zip archive is disabled", ErrorCodes::SUPPORT_IS_DISABLED);
#endif
}
ZipArchiveWriter::HandleHolder ZipArchiveWriter::acquireHandle()
{
return HandleHolder{std::static_pointer_cast<ZipArchiveWriter>(shared_from_this())};
}
RawHandle ZipArchiveWriter::acquireRawHandle()
{
std::lock_guard lock{mutex};
if (!handle)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot have more than one write buffer while writing a zip archive");
return std::exchange(handle, nullptr);
}
void ZipArchiveWriter::releaseRawHandle(RawHandle raw_handle_)
{
std::lock_guard lock{mutex};
handle = raw_handle_;
}
void ZipArchiveWriter::checkResult(int code) const
{
if (code >= ZIP_OK)
return;
String message = "Code= ";
switch (code)
{
case ZIP_ERRNO: message += "ERRNO, errno= " + String{strerror(errno)}; break;
case ZIP_PARAMERROR: message += "PARAMERROR"; break;
case ZIP_BADZIPFILE: message += "BADZIPFILE"; break;
case ZIP_INTERNALERROR: message += "INTERNALERROR"; break;
default: message += std::to_string(code); break;
}
showError(message);
}
void ZipArchiveWriter::showError(const String & message) const
{
throw Exception(ErrorCodes::CANNOT_PACK_ARCHIVE, "Couldn't pack zip archive {}: {}", quoteString(path_to_archive), message);
}
}
#endif

View File

@ -0,0 +1,97 @@
#pragma once
#include <Common/config.h>
#if USE_MINIZIP
#include <IO/Archives/IArchiveWriter.h>
#include <base/shared_ptr_helper.h>
#include <mutex>
namespace DB
{
class WriteBuffer;
class WriteBufferFromFileBase;
/// Implementation of IArchiveWriter for writing zip archives.
class ZipArchiveWriter : public shared_ptr_helper<ZipArchiveWriter>, public IArchiveWriter
{
public:
/// Destructors finalizes writing the archive.
~ZipArchiveWriter() override;
/// Starts writing a file to the archive. The function returns a write buffer,
/// any data written to that buffer will be compressed and then put to the archive.
/// You can keep only one such buffer at a time, a buffer returned by previous call
/// of the function `writeFile()` should be destroyed before next call of `writeFile()`.
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & filename) override;
/// Returns true if there is an active instance of WriteBuffer returned by writeFile().
/// This function should be used mostly for debugging purposes.
bool isWritingFile() const override;
/// Supported compression methods.
enum class CompressionMethod
{
/// See mz.h
kStore = 0,
kDeflate = 8,
kBzip2 = 12,
kLzma = 14,
kZstd = 93,
kXz = 95,
};
/// Some compression levels.
enum class CompressionLevels
{
kDefault = kDefaultCompressionLevel,
kFast = 2,
kNormal = 6,
kBest = 9,
};
/// Sets compression method and level.
/// Changing them will affect next file in the archive.
void setCompression(int compression_method_, int compression_level_) override;
/// Sets password. Only contents of the files are encrypted,
/// names of files are not encrypted.
/// Changing the password will affect next file in the archive.
void setPassword(const String & password_) override;
/// Utility functions.
static CompressionMethod parseCompressionMethod(const String & str);
static void checkCompressionMethodIsEnabled(CompressionMethod method);
static void checkEncryptionIsEnabled();
private:
/// Constructs an archive that will be written as a file in the local filesystem.
explicit ZipArchiveWriter(const String & path_to_archive_);
/// Constructs an archive that will be written by using a specified `archive_write_buffer_`.
ZipArchiveWriter(const String & path_to_archive_, std::unique_ptr<WriteBuffer> archive_write_buffer_);
friend struct shared_ptr_helper<ZipArchiveWriter>;
class WriteBufferFromZipArchive;
class HandleHolder;
using RawHandle = void *;
HandleHolder acquireHandle();
RawHandle acquireRawHandle();
void releaseRawHandle(RawHandle raw_handle_);
void checkResult(int code) const;
[[noreturn]] void showError(const String & message) const;
const String path_to_archive;
int compression_method = static_cast<int>(CompressionMethod::kDeflate);
int compression_level = kDefaultCompressionLevel;
String password;
RawHandle handle = nullptr;
mutable std::mutex mutex;
};
}
#endif

View File

@ -0,0 +1,38 @@
#include <IO/Archives/createArchiveReader.h>
#include <IO/Archives/ZipArchiveReader.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_UNPACK_ARCHIVE;
extern const int SUPPORT_IS_DISABLED;
}
std::shared_ptr<IArchiveReader> createArchiveReader(const String & path_to_archive)
{
return createArchiveReader(path_to_archive, {}, 0);
}
std::shared_ptr<IArchiveReader> createArchiveReader(
const String & path_to_archive,
[[maybe_unused]] const std::function<std::unique_ptr<SeekableReadBuffer>()> & archive_read_function,
[[maybe_unused]] size_t archive_size)
{
if (path_to_archive.ends_with(".zip") || path_to_archive.ends_with(".zipx"))
{
#if USE_MINIZIP
return ZipArchiveReader::create(path_to_archive, archive_read_function, archive_size);
#else
throw Exception("minizip library is disabled", ErrorCodes::SUPPORT_IS_DISABLED);
#endif
}
else
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Cannot determine the type of archive {}", path_to_archive);
}
}

View File

@ -0,0 +1,22 @@
#pragma once
#include <base/types.h>
#include <memory>
namespace DB
{
class IArchiveReader;
class SeekableReadBuffer;
/// Starts reading a specified archive in the local filesystem.
std::shared_ptr<IArchiveReader> createArchiveReader(const String & path_to_archive);
/// Starts reading a specified archive, the archive is read by using a specified read buffer,
/// `path_to_archive` is used only to determine the archive's type.
std::shared_ptr<IArchiveReader> createArchiveReader(
const String & path_to_archive,
const std::function<std::unique_ptr<SeekableReadBuffer>()> & archive_read_function,
size_t archive_size);
}

View File

@ -0,0 +1,38 @@
#include <IO/Archives/createArchiveWriter.h>
#include <IO/Archives/ZipArchiveWriter.h>
#include <IO/WriteBuffer.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PACK_ARCHIVE;
extern const int SUPPORT_IS_DISABLED;
}
std::shared_ptr<IArchiveWriter> createArchiveWriter(const String & path_to_archive)
{
return createArchiveWriter(path_to_archive, nullptr);
}
std::shared_ptr<IArchiveWriter> createArchiveWriter(
const String & path_to_archive,
[[maybe_unused]] std::unique_ptr<WriteBuffer> archive_write_buffer)
{
if (path_to_archive.ends_with(".zip") || path_to_archive.ends_with(".zipx"))
{
#if USE_MINIZIP
return ZipArchiveWriter::create(path_to_archive, std::move(archive_write_buffer));
#else
throw Exception("minizip library is disabled", ErrorCodes::SUPPORT_IS_DISABLED);
#endif
}
else
throw Exception(ErrorCodes::CANNOT_PACK_ARCHIVE, "Cannot determine the type of archive {}", path_to_archive);
}
}

View File

@ -0,0 +1,19 @@
#pragma once
#include <base/types.h>
#include <memory>
namespace DB
{
class IArchiveWriter;
class WriteBuffer;
/// Starts writing a specified archive in the local filesystem.
std::shared_ptr<IArchiveWriter> createArchiveWriter(const String & path_to_archive);
/// Starts writing a specified archive, the archive is written by using a specified write buffer,
/// `path_to_archive` is used only to determine the archive's type.
std::shared_ptr<IArchiveWriter> createArchiveWriter(const String & path_to_archive, std::unique_ptr<WriteBuffer> archive_write_buffer);
}

View File

@ -12,31 +12,33 @@ off_t ReadBufferFromMemory::seek(off_t offset, int whence)
{
if (whence == SEEK_SET)
{
if (offset >= 0 && working_buffer.begin() + offset < working_buffer.end())
if (offset >= 0 && internal_buffer.begin() + offset < internal_buffer.end())
{
pos = working_buffer.begin() + offset;
return size_t(pos - working_buffer.begin());
pos = internal_buffer.begin() + offset;
working_buffer = internal_buffer; /// We need to restore `working_buffer` in case the position was at EOF before this seek().
return size_t(pos - internal_buffer.begin());
}
else
throw Exception(
"Seek position is out of bounds. "
"Offset: "
+ std::to_string(offset) + ", Max: " + std::to_string(size_t(working_buffer.end() - working_buffer.begin())),
+ std::to_string(offset) + ", Max: " + std::to_string(size_t(internal_buffer.end() - internal_buffer.begin())),
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
}
else if (whence == SEEK_CUR)
{
Position new_pos = pos + offset;
if (new_pos >= working_buffer.begin() && new_pos < working_buffer.end())
if (new_pos >= internal_buffer.begin() && new_pos < internal_buffer.end())
{
pos = new_pos;
return size_t(pos - working_buffer.begin());
working_buffer = internal_buffer; /// We need to restore `working_buffer` in case the position was at EOF before this seek().
return size_t(pos - internal_buffer.begin());
}
else
throw Exception(
"Seek position is out of bounds. "
"Offset: "
+ std::to_string(offset) + ", Max: " + std::to_string(size_t(working_buffer.end() - working_buffer.begin())),
+ std::to_string(offset) + ", Max: " + std::to_string(size_t(internal_buffer.end() - internal_buffer.begin())),
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
}
else
@ -45,7 +47,7 @@ off_t ReadBufferFromMemory::seek(off_t offset, int whence)
off_t ReadBufferFromMemory::getPosition()
{
return pos - working_buffer.begin();
return pos - internal_buffer.begin();
}
}

View File

@ -0,0 +1,341 @@
#include <gtest/gtest.h>
#include <Common/config.h>
#include <IO/Archives/IArchiveReader.h>
#include <IO/Archives/IArchiveWriter.h>
#include <IO/Archives/createArchiveReader.h>
#include <IO/Archives/createArchiveWriter.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Common/Exception.h>
#include <Poco/TemporaryFile.h>
#include <filesystem>
namespace DB::ErrorCodes
{
extern const int CANNOT_UNPACK_ARCHIVE;
}
namespace fs = std::filesystem;
using namespace DB;
class ArchiveReaderAndWriterTest : public ::testing::TestWithParam<const char *>
{
public:
ArchiveReaderAndWriterTest()
{
const char * archive_file_ext = GetParam();
path_to_archive = temp_folder.path() + "/archive" + archive_file_ext;
fs::create_directories(temp_folder.path());
}
const String & getPathToArchive() const { return path_to_archive; }
static void expectException(int code, const String & message, const std::function<void()> & func)
{
try
{
func();
}
catch (Exception & e)
{
if ((e.code() != code) || (e.message().find(message) == String::npos))
throw;
}
}
private:
Poco::TemporaryFile temp_folder;
String path_to_archive;
};
TEST_P(ArchiveReaderAndWriterTest, EmptyArchive)
{
/// Make an archive.
{
createArchiveWriter(getPathToArchive());
}
/// The created archive can be found in the local filesystem.
ASSERT_TRUE(fs::exists(getPathToArchive()));
/// Read the archive.
auto reader = createArchiveReader(getPathToArchive());
EXPECT_FALSE(reader->fileExists("nofile.txt"));
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "File 'nofile.txt' not found",
[&]{ reader->getFileInfo("nofile.txt"); });
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "File 'nofile.txt' not found",
[&]{ reader->readFile("nofile.txt"); });
EXPECT_EQ(reader->firstFile(), nullptr);
}
TEST_P(ArchiveReaderAndWriterTest, SingleFileInArchive)
{
/// Make an archive.
std::string_view contents = "The contents of a.txt";
{
auto writer = createArchiveWriter(getPathToArchive());
{
auto out = writer->writeFile("a.txt");
writeString(contents, *out);
}
}
/// Read the archive.
auto reader = createArchiveReader(getPathToArchive());
ASSERT_TRUE(reader->fileExists("a.txt"));
auto file_info = reader->getFileInfo("a.txt");
EXPECT_EQ(file_info.uncompressed_size, contents.size());
EXPECT_GT(file_info.compressed_size, 0);
{
auto in = reader->readFile("a.txt");
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents);
}
{
/// Use an enumerator.
auto enumerator = reader->firstFile();
ASSERT_NE(enumerator, nullptr);
EXPECT_EQ(enumerator->getFileName(), "a.txt");
EXPECT_EQ(enumerator->getFileInfo().uncompressed_size, contents.size());
EXPECT_GT(enumerator->getFileInfo().compressed_size, 0);
EXPECT_FALSE(enumerator->nextFile());
}
{
/// Use converting an enumerator to a reading buffer and vice versa.
auto enumerator = reader->firstFile();
ASSERT_NE(enumerator, nullptr);
EXPECT_EQ(enumerator->getFileName(), "a.txt");
auto in = reader->readFile(std::move(enumerator));
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents);
enumerator = reader->nextFile(std::move(in));
EXPECT_EQ(enumerator, nullptr);
}
{
/// Wrong using of an enumerator throws an exception.
auto enumerator = reader->firstFile();
ASSERT_NE(enumerator, nullptr);
EXPECT_FALSE(enumerator->nextFile());
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "No current file",
[&]{ enumerator->getFileName(); });
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "No current file",
[&] { reader->readFile(std::move(enumerator)); });
}
}
TEST_P(ArchiveReaderAndWriterTest, TwoFilesInArchive)
{
/// Make an archive.
std::string_view a_contents = "The contents of a.txt";
std::string_view c_contents = "The contents of b/c.txt";
{
auto writer = createArchiveWriter(getPathToArchive());
{
auto out = writer->writeFile("a.txt");
writeString(a_contents, *out);
}
{
auto out = writer->writeFile("b/c.txt");
writeString(c_contents, *out);
}
}
/// Read the archive.
auto reader = createArchiveReader(getPathToArchive());
ASSERT_TRUE(reader->fileExists("a.txt"));
ASSERT_TRUE(reader->fileExists("b/c.txt"));
EXPECT_EQ(reader->getFileInfo("a.txt").uncompressed_size, a_contents.size());
EXPECT_EQ(reader->getFileInfo("b/c.txt").uncompressed_size, c_contents.size());
{
auto in = reader->readFile("a.txt");
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, a_contents);
}
{
auto in = reader->readFile("b/c.txt");
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, c_contents);
}
{
/// Read a.txt again.
auto in = reader->readFile("a.txt");
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, a_contents);
}
{
/// Use an enumerator.
auto enumerator = reader->firstFile();
ASSERT_NE(enumerator, nullptr);
EXPECT_EQ(enumerator->getFileName(), "a.txt");
EXPECT_EQ(enumerator->getFileInfo().uncompressed_size, a_contents.size());
EXPECT_TRUE(enumerator->nextFile());
EXPECT_EQ(enumerator->getFileName(), "b/c.txt");
EXPECT_EQ(enumerator->getFileInfo().uncompressed_size, c_contents.size());
EXPECT_FALSE(enumerator->nextFile());
}
{
/// Use converting an enumerator to a reading buffer and vice versa.
auto enumerator = reader->firstFile();
ASSERT_NE(enumerator, nullptr);
EXPECT_EQ(enumerator->getFileName(), "a.txt");
auto in = reader->readFile(std::move(enumerator));
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, a_contents);
enumerator = reader->nextFile(std::move(in));
ASSERT_NE(enumerator, nullptr);
EXPECT_EQ(enumerator->getFileName(), "b/c.txt");
in = reader->readFile(std::move(enumerator));
readStringUntilEOF(str, *in);
EXPECT_EQ(str, c_contents);
enumerator = reader->nextFile(std::move(in));
EXPECT_EQ(enumerator, nullptr);
}
}
TEST_P(ArchiveReaderAndWriterTest, InMemory)
{
String archive_in_memory;
/// Make an archive.
std::string_view a_contents = "The contents of a.txt";
std::string_view b_contents = "The contents of b.txt";
{
auto writer = createArchiveWriter(getPathToArchive(), std::make_unique<WriteBufferFromString>(archive_in_memory));
{
auto out = writer->writeFile("a.txt");
writeString(a_contents, *out);
}
{
auto out = writer->writeFile("b.txt");
writeString(b_contents, *out);
}
}
/// The created archive is really in memory.
ASSERT_FALSE(fs::exists(getPathToArchive()));
/// Read the archive.
auto read_archive_func = [&]() -> std::unique_ptr<SeekableReadBuffer> { return std::make_unique<ReadBufferFromString>(archive_in_memory); };
auto reader = createArchiveReader(getPathToArchive(), read_archive_func, archive_in_memory.size());
ASSERT_TRUE(reader->fileExists("a.txt"));
ASSERT_TRUE(reader->fileExists("b.txt"));
EXPECT_EQ(reader->getFileInfo("a.txt").uncompressed_size, a_contents.size());
EXPECT_EQ(reader->getFileInfo("b.txt").uncompressed_size, b_contents.size());
{
auto in = reader->readFile("a.txt");
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, a_contents);
}
{
auto in = reader->readFile("b.txt");
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, b_contents);
}
{
/// Read a.txt again.
auto in = reader->readFile("a.txt");
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, a_contents);
}
}
TEST_P(ArchiveReaderAndWriterTest, Password)
{
/// Make an archive.
std::string_view contents = "The contents of a.txt";
{
auto writer = createArchiveWriter(getPathToArchive());
writer->setPassword("Qwe123");
{
auto out = writer->writeFile("a.txt");
writeString(contents, *out);
}
}
/// Read the archive.
auto reader = createArchiveReader(getPathToArchive());
/// Try to read without a password.
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Password is required",
[&]{ reader->readFile("a.txt"); });
{
/// Try to read with a wrong password.
reader->setPassword("123Qwe");
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Wrong password",
[&]{ reader->readFile("a.txt"); });
}
{
/// Reading with the right password is successful.
reader->setPassword("Qwe123");
auto in = reader->readFile("a.txt");
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents);
}
}
TEST_P(ArchiveReaderAndWriterTest, ArchiveNotExist)
{
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't open",
[&]{ createArchiveReader(getPathToArchive()); });
}
namespace
{
const char * supported_archive_file_exts[] =
{
#if USE_MINIZIP
".zip",
#endif
};
}
INSTANTIATE_TEST_SUITE_P(All, ArchiveReaderAndWriterTest, ::testing::ValuesIn(supported_archive_file_exts));

View File

@ -602,8 +602,8 @@ NameSet ActionsDAG::foldActionsByProjection(
std::unordered_set<const Node *> visited_nodes;
std::unordered_set<std::string_view> visited_index_names;
std::stack<Node *> stack;
std::vector<const ColumnWithTypeAndName *> missing_input_from_projection_keys;
/// Record all needed index nodes to start folding.
for (const auto & node : index)
{
if (required_columns.find(node->result_name) != required_columns.end() || node->result_name == predicate_column_name)
@ -614,6 +614,9 @@ NameSet ActionsDAG::foldActionsByProjection(
}
}
/// If some required columns are not in any index node, try searching from all projection key
/// columns. If still missing, return empty set which means current projection fails to match
/// (missing columns).
if (add_missing_keys)
{
for (const auto & column : required_columns)
@ -636,6 +639,7 @@ NameSet ActionsDAG::foldActionsByProjection(
}
}
/// Traverse the DAG from root to leaf. Substitute any matched node with columns in projection_block_for_keys.
while (!stack.empty())
{
auto * node = stack.top();
@ -664,10 +668,12 @@ NameSet ActionsDAG::foldActionsByProjection(
}
}
/// Clean up unused nodes after folding.
std::erase_if(inputs, [&](const Node * node) { return visited_nodes.count(node) == 0; });
std::erase_if(index, [&](const Node * node) { return visited_index_names.count(node->result_name) == 0; });
nodes.remove_if([&](const Node & node) { return visited_nodes.count(&node) == 0; });
/// Calculate the required columns after folding.
NameSet next_required_columns;
for (const auto & input : inputs)
next_required_columns.insert(input->result_name);

View File

@ -163,12 +163,43 @@ public:
void removeUnusedActions(const Names & required_names, bool allow_remove_inputs = true, bool allow_constant_folding = true);
void removeUnusedActions(const NameSet & required_names, bool allow_remove_inputs = true, bool allow_constant_folding = true);
/// Transform the current DAG in a way that leaf nodes get folded into their parents. It's done
/// because each projection can provide some columns as inputs to substitute certain sub-DAGs
/// (expressions). Consider the following example:
/// CREATE TABLE tbl (dt DateTime, val UInt64,
/// PROJECTION p_hour (SELECT SUM(val) GROUP BY toStartOfHour(dt)));
///
/// Query: SELECT toStartOfHour(dt), SUM(val) FROM tbl GROUP BY toStartOfHour(dt);
///
/// We will have an ActionsDAG like this:
/// FUNCTION: toStartOfHour(dt) SUM(val)
/// ^ ^
/// | |
/// INPUT: dt val
///
/// Now we traverse the DAG and see if any FUNCTION node can be replaced by projection's INPUT node.
/// The result DAG will be:
/// INPUT: toStartOfHour(dt) SUM(val)
///
/// We don't need aggregate columns from projection because they are matched after DAG.
/// Currently we use canonical names of each node to find matches. It can be improved after we
/// have a full-featured name binding system.
///
/// @param required_columns should contain columns which this DAG is required to produce after folding. It used for result actions.
/// @param projection_block_for_keys contains all key columns of given projection.
/// @param predicate_column_name means we need to produce the predicate column after folding.
/// @param add_missing_keys means whether to add additional missing columns to input nodes from projection key columns directly.
/// @return required columns for this folded DAG. It's expected to be fewer than the original ones if some projection is used.
NameSet foldActionsByProjection(
const NameSet & required_columns,
const Block & projection_block_for_keys,
const String & predicate_column_name = {},
bool add_missing_keys = true);
/// Reorder the index nodes using given position mapping.
void reorderAggregationKeysForProjection(const std::unordered_map<std::string_view, size_t> & key_names_pos_map);
/// Add aggregate columns to index nodes from projection
void addAggregatesViaProjection(const Block & aggregates);
bool hasArrayJoin() const;

View File

@ -944,7 +944,10 @@ static std::unique_ptr<QueryPlan> buildJoinedPlan(
* - JOIN tables will need aliases to correctly resolve USING clause.
*/
auto interpreter = interpretSubquery(
join_element.table_expression, context, original_right_columns, query_options.copy().setWithAllColumns().ignoreAlias(false));
join_element.table_expression,
context,
original_right_columns,
query_options.copy().setWithAllColumns().ignoreProjections(false).ignoreAlias(false));
auto joined_plan = std::make_unique<QueryPlan>();
interpreter->buildQueryPlan(*joined_plan);
{

View File

@ -1180,8 +1180,10 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
{
executeAggregation(
query_plan, expressions.before_aggregation, aggregate_overflow_row, aggregate_final, query_info.input_order_info);
/// We need to reset input order info, so that executeOrder can't use it
/// We need to reset input order info, so that executeOrder can't use it
query_info.input_order_info.reset();
if (query_info.projection)
query_info.projection->input_order_info.reset();
}
// Now we must execute:
@ -1950,7 +1952,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
query_info.projection->order_optimizer = std::make_shared<ReadInOrderOptimizer>(
query,
query_info.projection->group_by_elements_actions,
getSortDescriptionFromGroupBy(query),
query_info.projection->group_by_elements_order_descr,
query_info.syntax_analyzer_result);
}
else

View File

@ -20,7 +20,7 @@ void ASTProjectionDeclaration::formatImpl(const FormatSettings & settings, Forma
settings.ostr << backQuoteIfNeed(name);
std::string indent_str = settings.one_line ? "" : std::string(4u * frame.indent, ' ');
std::string nl_or_nothing = settings.one_line ? "" : "\n";
settings.ostr << nl_or_nothing << indent_str << "(" << nl_or_nothing;
settings.ostr << settings.nl_or_ws << indent_str << "(" << nl_or_nothing;
FormatStateStacked frame_nested = frame;
frame_nested.need_parens = false;
++frame_nested.indent;

View File

@ -72,8 +72,18 @@ void ASTProjectionSelectQuery::formatImpl(const FormatSettings & s, FormatState
if (orderBy())
{
/// Let's convert the ASTFunction into ASTExpressionList, which generates consistent format
/// between GROUP BY and ORDER BY projection definition.
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "ORDER BY " << (s.hilite ? hilite_none : "");
orderBy()->formatImpl(s, state, frame);
ASTPtr order_by;
if (auto * func = orderBy()->as<ASTFunction>())
order_by = func->arguments;
else
{
order_by = std::make_shared<ASTExpressionList>();
order_by->children.push_back(orderBy());
}
s.one_line ? order_by->formatImpl(s, state, frame) : order_by->as<ASTExpressionList &>().formatImplMultiline(s, state, frame);
}
}

View File

@ -55,6 +55,7 @@ bool ParserProjectionSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected &
return false;
}
// ORDER BY needs to be an ASTFunction so that we can use it as a sorting key
if (s_order_by.ignore(pos, expected))
{
ASTPtr expr_list;

View File

@ -63,9 +63,9 @@ ForkProcessor::Status ForkProcessor::prepare()
{
++num_processed_outputs;
if (num_processed_outputs == num_active_outputs)
output.push(std::move(data)); // NOLINT Can push because no full or unneeded outputs.
output.push(std::move(data)); /// NOLINT Can push because no full or unneeded outputs.
else
output.push(data.clone());
output.push(data.clone()); /// NOLINT
}
}

View File

@ -20,7 +20,7 @@ namespace ErrorCodes
TSKVRowInputFormat::TSKVRowInputFormat(ReadBuffer & in_, Block header_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(std::move(header_), in_, std::move(params_)), format_settings(format_settings_), name_map(header_.columns())
: IRowInputFormat(header_, in_, std::move(params_)), format_settings(format_settings_), name_map(header_.columns())
{
const auto & sample_block = getPort().getHeader();
size_t num_columns = sample_block.columns();

View File

@ -9,7 +9,6 @@
#include <Common/SettingsChanges.h>
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <DataTypes/DataTypeFactory.h>
#include <QueryPipeline/ProfileInfo.h>
#include <Interpreters/Context.h>
@ -990,41 +989,10 @@ namespace
assert(!pipeline);
auto source = query_context->getInputFormat(
input_format, *read_buffer, header, query_context->getSettings().max_insert_block_size);
QueryPipelineBuilder builder;
builder.init(Pipe(source));
/// Add default values if necessary.
if (ast)
{
if (insert_query)
{
auto table_id = StorageID::createEmpty();
if (insert_query->table_id)
{
table_id = query_context->resolveStorageID(insert_query->table_id, Context::ResolveOrdinary);
}
else
{
StorageID local_table_id(insert_query->getDatabase(), insert_query->getTable());
table_id = query_context->resolveStorageID(local_table_id, Context::ResolveOrdinary);
}
if (query_context->getSettingsRef().input_format_defaults_for_omitted_fields && table_id)
{
StoragePtr storage = DatabaseCatalog::instance().getTable(table_id, query_context);
const auto & columns = storage->getInMemoryMetadataPtr()->getColumns();
if (!columns.empty())
{
builder.addSimpleTransform([&](const Block & cur_header)
{
return std::make_shared<AddingDefaultsTransform>(cur_header, columns, *source, query_context);
});
}
}
}
}
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
pipeline_executor = std::make_unique<PullingPipelineExecutor>(*pipeline);
}

View File

@ -8,6 +8,9 @@
#include <Interpreters/TreeRewriter.h>
#include <Storages/extractKeyExpressionList.h>
#include <Common/quoteString.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
namespace DB
@ -161,4 +164,17 @@ KeyDescription KeyDescription::buildEmptyKey()
return result;
}
KeyDescription KeyDescription::parse(const String & str, const ColumnsDescription & columns, ContextPtr context)
{
KeyDescription result;
if (str.empty())
return result;
ParserExpression parser;
ASTPtr ast = parseQuery(parser, "(" + str + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
FunctionNameNormalizer().visit(ast.get());
return getKeyFromAST(ast, columns, context);
}
}

View File

@ -76,6 +76,9 @@ struct KeyDescription
/// Substitute modulo with moduloLegacy. Used in KeyCondition to allow proper comparison with keys.
static bool moduloToModuloLegacyRecursive(ASTPtr node_expr);
/// Parse description from string
static KeyDescription parse(const String & str, const ColumnsDescription & columns, ContextPtr context);
};
}

View File

@ -35,6 +35,7 @@
#include <Parsers/ASTNameTypePair.h>
#include <Parsers/ASTPartition.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
@ -4509,6 +4510,7 @@ Block MergeTreeData::getMinMaxCountProjectionBlock(
const SelectQueryInfo & query_info,
const DataPartsVector & parts,
DataPartsVector & normal_parts,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
ContextPtr query_context) const
{
if (!metadata_snapshot->minmax_count_projection)
@ -4545,6 +4547,23 @@ Block MergeTreeData::getMinMaxCountProjectionBlock(
if (virtual_columns_block.rows() == 0)
return {};
std::optional<PartitionPruner> partition_pruner;
std::optional<KeyCondition> minmax_idx_condition;
DataTypes minmax_columns_types;
if (metadata_snapshot->hasPartitionKey())
{
const auto & partition_key = metadata_snapshot->getPartitionKey();
auto minmax_columns_names = getMinMaxColumnsNames(partition_key);
minmax_columns_types = getMinMaxColumnsTypes(partition_key);
minmax_idx_condition.emplace(
query_info,
query_context,
minmax_columns_names,
getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(query_context)));
partition_pruner.emplace(metadata_snapshot, query_info, query_context, false /* strict */);
}
// Generate valid expressions for filtering
VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, query_context, virtual_columns_block, expression_ast);
if (expression_ast)
@ -4553,6 +4572,8 @@ Block MergeTreeData::getMinMaxCountProjectionBlock(
size_t rows = virtual_columns_block.rows();
const ColumnString & part_name_column = typeid_cast<const ColumnString &>(*virtual_columns_block.getByName("_part").column);
size_t part_idx = 0;
auto filter_column = ColumnUInt8::create();
auto & filter_column_data = filter_column->getData();
for (size_t row = 0; row < rows; ++row)
{
while (parts[part_idx]->name != part_name_column.getDataAt(row))
@ -4563,12 +4584,32 @@ Block MergeTreeData::getMinMaxCountProjectionBlock(
if (!part->minmax_idx->initialized)
throw Exception("Found a non-empty part with uninitialized minmax_idx. It's a bug", ErrorCodes::LOGICAL_ERROR);
filter_column_data.emplace_back();
if (max_block_numbers_to_read)
{
auto blocks_iterator = max_block_numbers_to_read->find(part->info.partition_id);
if (blocks_iterator == max_block_numbers_to_read->end() || part->info.max_block > blocks_iterator->second)
continue;
}
if (minmax_idx_condition
&& !minmax_idx_condition->checkInHyperrectangle(part->minmax_idx->hyperrectangle, minmax_columns_types).can_be_true)
continue;
if (partition_pruner)
{
if (partition_pruner->canBePruned(*part))
continue;
}
if (need_primary_key_max_column && !part->index_granularity.hasFinalMark())
{
normal_parts.push_back(part);
continue;
}
filter_column_data.back() = 1;
size_t pos = 0;
for (size_t i : metadata_snapshot->minmax_count_projection->partition_value_indices)
{
@ -4611,6 +4652,16 @@ Block MergeTreeData::getMinMaxCountProjectionBlock(
}
block.setColumns(std::move(partition_minmax_count_columns));
FilterDescription filter(*filter_column);
for (size_t i = 0; i < virtual_columns_block.columns(); ++i)
{
ColumnPtr & column = virtual_columns_block.safeGetByPosition(i).column;
column = column->filter(*filter.data, -1);
}
if (block.rows() == 0)
return {};
Block res;
for (const auto & name : required_columns)
{
@ -4635,21 +4686,31 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
if (!settings.allow_experimental_projection_optimization || query_info.ignore_projections || query_info.is_projection_query)
return std::nullopt;
const auto & query_ptr = query_info.original_query;
// Currently projections don't support parallel replicas reading yet.
if (settings.parallel_replicas_count > 1 || settings.max_parallel_replicas > 1)
return std::nullopt;
if (auto * select = query_ptr->as<ASTSelectQuery>(); select)
{
// Currently projections don't support final yet.
if (select->final())
return std::nullopt;
auto query_ptr = query_info.original_query;
auto * select_query = query_ptr->as<ASTSelectQuery>();
if (!select_query)
return std::nullopt;
// Currently projections don't support ARRAY JOIN yet.
if (select->arrayJoinExpressionList().first)
return std::nullopt;
}
// Currently projections don't support final yet.
if (select_query->final())
return std::nullopt;
// Currently projections don't support sampling yet.
if (settings.parallel_replicas_count > 1)
// Currently projections don't support sample yet.
if (select_query->sampleSize())
return std::nullopt;
// Currently projections don't support ARRAY JOIN yet.
if (select_query->arrayJoinExpressionList().first)
return std::nullopt;
// In order to properly analyze joins, aliases should be recognized. However, aliases get lost during projection analysis.
// Let's disable projection if there are any JOIN clauses.
// TODO: We need a better identifier resolution mechanism for projection analysis.
if (select_query->join())
return std::nullopt;
InterpreterSelectQuery select(
@ -4684,7 +4745,7 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
keys.insert(desc.name);
key_name_pos_map.insert({desc.name, pos++});
}
auto actions_settings = ExpressionActionsSettings::fromSettings(settings);
auto actions_settings = ExpressionActionsSettings::fromSettings(settings, CompileExpressions::yes);
// All required columns should be provided by either current projection or previous actions
// Let's traverse backward to finish the check.
@ -4720,7 +4781,7 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
required_columns.erase(column.name);
{
// Prewhere_action should not add missing keys.
// prewhere_action should not add missing keys.
auto new_prewhere_required_columns = prewhere_actions->foldActionsByProjection(
prewhere_required_columns, projection.sample_block_for_keys, candidate.prewhere_info->prewhere_column_name, false);
if (new_prewhere_required_columns.empty() && !prewhere_required_columns.empty())
@ -4732,6 +4793,7 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
if (candidate.prewhere_info->row_level_filter)
{
auto row_level_filter_actions = candidate.prewhere_info->row_level_filter->clone();
// row_level_filter_action should not add missing keys.
auto new_prewhere_required_columns = row_level_filter_actions->foldActionsByProjection(
prewhere_required_columns, projection.sample_block_for_keys, candidate.prewhere_info->row_level_column_name, false);
if (new_prewhere_required_columns.empty() && !prewhere_required_columns.empty())
@ -4743,6 +4805,7 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
if (candidate.prewhere_info->alias_actions)
{
auto alias_actions = candidate.prewhere_info->alias_actions->clone();
// alias_action should not add missing keys.
auto new_prewhere_required_columns
= alias_actions->foldActionsByProjection(prewhere_required_columns, projection.sample_block_for_keys, {}, false);
if (new_prewhere_required_columns.empty() && !prewhere_required_columns.empty())
@ -4780,6 +4843,18 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
sample_block_for_keys.insertUnique(column);
}
// If optimize_aggregation_in_order = true, we need additional information to transform the projection's pipeline.
auto attach_aggregation_in_order_info = [&]()
{
for (const auto & key : keys)
{
auto actions_dag = analysis_result.before_aggregation->clone();
actions_dag->foldActionsByProjection({key}, sample_block_for_keys);
candidate.group_by_elements_actions.emplace_back(std::make_shared<ExpressionActions>(actions_dag, actions_settings));
candidate.group_by_elements_order_descr.emplace_back(key, 1, 1);
}
};
if (projection.type == ProjectionDescription::Type::Aggregate && analysis_result.need_aggregate && can_use_aggregate_projection)
{
bool match = true;
@ -4789,16 +4864,13 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
{
const auto * column = sample_block.findByName(aggregate.column_name);
if (column)
{
aggregates.insert(*column);
}
else
{
match = false;
break;
}
}
if (!match)
return;
@ -4814,14 +4886,7 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
return;
if (analysis_result.optimize_aggregation_in_order)
{
for (const auto & key : keys)
{
auto actions_dag = analysis_result.before_aggregation->clone();
actions_dag->foldActionsByProjection({key}, sample_block_for_keys);
candidate.group_by_elements_actions.emplace_back(std::make_shared<ExpressionActions>(actions_dag, actions_settings));
}
}
attach_aggregation_in_order_info();
// Reorder aggregation keys and attach aggregates
candidate.before_aggregation->reorderAggregationKeysForProjection(key_name_pos_map);
@ -4835,19 +4900,24 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
candidates.push_back(std::move(candidate));
}
}
if (projection.type == ProjectionDescription::Type::Normal && (analysis_result.hasWhere() || analysis_result.hasPrewhere()))
else if (projection.type == ProjectionDescription::Type::Normal)
{
const auto & actions
= analysis_result.before_aggregation ? analysis_result.before_aggregation : analysis_result.before_order_by;
NameSet required_columns;
for (const auto & column : actions->getRequiredColumns())
required_columns.insert(column.name);
if (analysis_result.before_aggregation && analysis_result.optimize_aggregation_in_order)
attach_aggregation_in_order_info();
if (rewrite_before_where(candidate, projection, required_columns, sample_block, {}))
if (analysis_result.hasWhere() || analysis_result.hasPrewhere())
{
candidate.required_columns = {required_columns.begin(), required_columns.end()};
candidates.push_back(std::move(candidate));
const auto & actions
= analysis_result.before_aggregation ? analysis_result.before_aggregation : analysis_result.before_order_by;
NameSet required_columns;
for (const auto & column : actions->getRequiredColumns())
required_columns.insert(column.name);
if (rewrite_before_where(candidate, projection, required_columns, sample_block, {}))
{
candidate.required_columns = {required_columns.begin(), required_columns.end()};
candidates.push_back(std::move(candidate));
}
}
}
};
@ -4876,9 +4946,15 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
{
DataPartsVector normal_parts;
query_info.minmax_count_projection_block = getMinMaxCountProjectionBlock(
metadata_snapshot, minmax_conut_projection_candidate->required_columns, query_info, parts, normal_parts, query_context);
metadata_snapshot,
minmax_conut_projection_candidate->required_columns,
query_info,
parts,
normal_parts,
max_added_blocks.get(),
query_context);
if (minmax_conut_projection_candidate->prewhere_info)
if (query_info.minmax_count_projection_block && minmax_conut_projection_candidate->prewhere_info)
{
const auto & prewhere_info = minmax_conut_projection_candidate->prewhere_info;
if (prewhere_info->alias_actions)

View File

@ -43,6 +43,7 @@ class MergeTreePartsMover;
class MergeTreeDataMergerMutator;
class MutationCommands;
class Context;
using PartitionIdToMaxBlock = std::unordered_map<String, Int64>;
struct JobAndPool;
struct ZeroCopyLock;
@ -391,6 +392,7 @@ public:
const SelectQueryInfo & query_info,
const DataPartsVector & parts,
DataPartsVector & normal_parts,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
ContextPtr query_context) const;
std::optional<ProjectionCandidate> getQueryProcessingStageWithAggregateProjection(

View File

@ -168,7 +168,7 @@ ReplicatedMergeTreeTableMetadata ReplicatedMergeTreeTableMetadata::parse(const S
}
void ReplicatedMergeTreeTableMetadata::checkImmutableFieldsEquals(const ReplicatedMergeTreeTableMetadata & from_zk) const
void ReplicatedMergeTreeTableMetadata::checkImmutableFieldsEquals(const ReplicatedMergeTreeTableMetadata & from_zk, const ColumnsDescription & columns, ContextPtr context) const
{
if (data_format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
@ -203,9 +203,12 @@ void ReplicatedMergeTreeTableMetadata::checkImmutableFieldsEquals(const Replicat
/// NOTE: You can make a less strict check of match expressions so that tables do not break from small changes
/// in formatAST code.
if (primary_key != from_zk.primary_key)
String parsed_zk_primary_key = formattedAST(KeyDescription::parse(from_zk.primary_key, columns, context).expression_list_ast);
if (primary_key != parsed_zk_primary_key)
throw Exception("Existing table metadata in ZooKeeper differs in primary key."
" Stored in ZooKeeper: " + from_zk.primary_key + ", local: " + primary_key,
" Stored in ZooKeeper: " + from_zk.primary_key +
", parsed from ZooKeeper: " + parsed_zk_primary_key +
", local: " + primary_key,
ErrorCodes::METADATA_MISMATCH);
if (data_format_version != from_zk.data_format_version)
@ -214,39 +217,53 @@ void ReplicatedMergeTreeTableMetadata::checkImmutableFieldsEquals(const Replicat
", local: " + DB::toString(data_format_version.toUnderType()),
ErrorCodes::METADATA_MISMATCH);
if (partition_key != from_zk.partition_key)
String parsed_zk_partition_key = formattedAST(KeyDescription::parse(from_zk.partition_key, columns, context).expression_list_ast);
if (partition_key != parsed_zk_partition_key)
throw Exception(
"Existing table metadata in ZooKeeper differs in partition key expression."
" Stored in ZooKeeper: " + from_zk.partition_key + ", local: " + partition_key,
" Stored in ZooKeeper: " + from_zk.partition_key +
", parsed from ZooKeeper: " + parsed_zk_partition_key +
", local: " + partition_key,
ErrorCodes::METADATA_MISMATCH);
}
void ReplicatedMergeTreeTableMetadata::checkEquals(const ReplicatedMergeTreeTableMetadata & from_zk, const ColumnsDescription & columns, ContextPtr context) const
{
checkImmutableFieldsEquals(from_zk);
checkImmutableFieldsEquals(from_zk, columns, context);
if (sampling_expression != from_zk.sampling_expression)
throw Exception("Existing table metadata in ZooKeeper differs in sample expression."
" Stored in ZooKeeper: " + from_zk.sampling_expression + ", local: " + sampling_expression,
ErrorCodes::METADATA_MISMATCH);
if (sorting_key != from_zk.sorting_key)
String parsed_zk_sampling_expression = formattedAST(KeyDescription::parse(from_zk.sampling_expression, columns, context).definition_ast);
if (sampling_expression != parsed_zk_sampling_expression)
{
throw Exception(
"Existing table metadata in ZooKeeper differs in sorting key expression."
" Stored in ZooKeeper: " + from_zk.sorting_key + ", local: " + sorting_key,
"Existing table metadata in ZooKeeper differs in sample expression."
" Stored in ZooKeeper: " + from_zk.sampling_expression +
", parsed from ZooKeeper: " + parsed_zk_sampling_expression +
", local: " + sampling_expression,
ErrorCodes::METADATA_MISMATCH);
}
if (ttl_table != from_zk.ttl_table)
String parsed_zk_sorting_key = formattedAST(extractKeyExpressionList(KeyDescription::parse(from_zk.sorting_key, columns, context).definition_ast));
if (sorting_key != parsed_zk_sorting_key)
{
throw Exception(
"Existing table metadata in ZooKeeper differs in TTL."
" Stored in ZooKeeper: " + from_zk.ttl_table +
", local: " + ttl_table,
ErrorCodes::METADATA_MISMATCH);
"Existing table metadata in ZooKeeper differs in sorting key expression."
" Stored in ZooKeeper: " + from_zk.sorting_key +
", parsed from ZooKeeper: " + parsed_zk_sorting_key +
", local: " + sorting_key,
ErrorCodes::METADATA_MISMATCH);
}
auto parsed_primary_key = KeyDescription::parse(primary_key, columns, context);
String parsed_zk_ttl_table = formattedAST(TTLTableDescription::parse(from_zk.ttl_table, columns, context, parsed_primary_key).definition_ast);
if (ttl_table != parsed_zk_ttl_table)
{
throw Exception(
"Existing table metadata in ZooKeeper differs in TTL."
" Stored in ZooKeeper: " + from_zk.ttl_table +
", parsed from ZooKeeper: " + parsed_zk_ttl_table +
", local: " + ttl_table,
ErrorCodes::METADATA_MISMATCH);
}
String parsed_zk_skip_indices = IndicesDescription::parse(from_zk.skip_indices, columns, context).toString();
@ -290,10 +307,10 @@ void ReplicatedMergeTreeTableMetadata::checkEquals(const ReplicatedMergeTreeTabl
}
ReplicatedMergeTreeTableMetadata::Diff
ReplicatedMergeTreeTableMetadata::checkAndFindDiff(const ReplicatedMergeTreeTableMetadata & from_zk) const
ReplicatedMergeTreeTableMetadata::checkAndFindDiff(const ReplicatedMergeTreeTableMetadata & from_zk, const ColumnsDescription & columns, ContextPtr context) const
{
checkImmutableFieldsEquals(from_zk);
checkImmutableFieldsEquals(from_zk, columns, context);
Diff diff;

View File

@ -70,11 +70,11 @@ struct ReplicatedMergeTreeTableMetadata
void checkEquals(const ReplicatedMergeTreeTableMetadata & from_zk, const ColumnsDescription & columns, ContextPtr context) const;
Diff checkAndFindDiff(const ReplicatedMergeTreeTableMetadata & from_zk) const;
Diff checkAndFindDiff(const ReplicatedMergeTreeTableMetadata & from_zk, const ColumnsDescription & columns, ContextPtr context) const;
private:
void checkImmutableFieldsEquals(const ReplicatedMergeTreeTableMetadata & from_zk) const;
void checkImmutableFieldsEquals(const ReplicatedMergeTreeTableMetadata & from_zk, const ColumnsDescription & columns, ContextPtr context) const;
bool index_granularity_bytes_found_in_zk = false;
};

View File

@ -277,10 +277,16 @@ Pipe StorageMaterializedPostgreSQL::read(
size_t max_block_size,
unsigned num_streams)
{
auto materialized_table_lock = lockForShare(String(), context_->getSettingsRef().lock_acquire_timeout);
auto nested_table = getNested();
return readFinalFromNestedStorage(nested_table, column_names, metadata_snapshot,
auto pipe = readFinalFromNestedStorage(nested_table, column_names, metadata_snapshot,
query_info, context_, processed_stage, max_block_size, num_streams);
auto lock = lockForShare(context_->getCurrentQueryId(), context_->getSettingsRef().lock_acquire_timeout);
pipe.addTableLock(lock);
pipe.addStorageHolder(shared_from_this());
return pipe;
}

View File

@ -107,7 +107,9 @@ ProjectionDescription::getProjectionFromAST(const ASTPtr & definition_ast, const
auto external_storage_holder = std::make_shared<TemporaryTableHolder>(query_context, columns, ConstraintsDescription{});
StoragePtr storage = external_storage_holder->getTable();
InterpreterSelectQuery select(
result.query_ast, query_context, storage, {}, SelectQueryOptions{QueryProcessingStage::WithMergeableState}.modify().ignoreAlias());
result.query_ast, query_context, storage, {},
/// Here we ignore ast optimizations because otherwise aggregation keys may be removed from result header as constants.
SelectQueryOptions{QueryProcessingStage::WithMergeableState}.modify().ignoreAlias().ignoreASTOptimizationsAlias());
result.required_columns = select.getRequiredColumns();
result.sample_block = select.getSampleBlock();
@ -179,7 +181,7 @@ ProjectionDescription::getProjectionFromAST(const ASTPtr & definition_ast, const
ProjectionDescription ProjectionDescription::getMinMaxCountProjection(
const ColumnsDescription & columns,
const ASTPtr & partition_columns,
ASTPtr partition_columns,
const Names & minmax_columns,
const ASTs & primary_key_asts,
ContextPtr query_context)
@ -203,7 +205,12 @@ ProjectionDescription ProjectionDescription::getMinMaxCountProjection(
select_query->setExpression(ASTProjectionSelectQuery::Expression::SELECT, std::move(select_expression_list));
if (partition_columns && !partition_columns->children.empty())
{
partition_columns = partition_columns->clone();
for (const auto & partition_column : partition_columns->children)
KeyDescription::moduloToModuloLegacyRecursive(partition_column);
select_query->setExpression(ASTProjectionSelectQuery::Expression::GROUP_BY, partition_columns->clone());
}
result.definition_ast = select_query;
result.name = MINMAX_COUNT_PROJECTION_NAME;

View File

@ -73,7 +73,7 @@ struct ProjectionDescription
static ProjectionDescription getMinMaxCountProjection(
const ColumnsDescription & columns,
const ASTPtr & partition_columns,
ASTPtr partition_columns,
const Names & minmax_columns,
const ASTs & primary_key_asts,
ContextPtr query_context);

View File

@ -57,6 +57,7 @@ Pipe readFinalFromNestedStorage(
Pipe pipe = nested_storage->read(require_columns_name, nested_metadata, query_info, context, processed_stage, max_block_size, num_streams);
pipe.addTableLock(lock);
pipe.addStorageHolder(nested_storage);
if (!expressions->children.empty() && !pipe.empty())
{

View File

@ -127,6 +127,7 @@ struct ProjectionCandidate
ReadInOrderOptimizerPtr order_optimizer;
InputOrderInfoPtr input_order_info;
ManyExpressionActions group_by_elements_actions;
SortDescription group_by_elements_order_descr;
std::shared_ptr<SubqueriesForSets> subqueries_for_sets;
MergeTreeDataSelectAnalysisResultPtr merge_tree_projection_select_result_ptr;
MergeTreeDataSelectAnalysisResultPtr merge_tree_normal_select_result_ptr;

View File

@ -199,6 +199,8 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
/// TODO: Find a way to support projections for StorageBuffer
query_info.ignore_projections = true;
return destination->getQueryProcessingStage(local_context, to_stage, destination->getInMemoryMetadataPtr(), query_info);
}
@ -365,9 +367,10 @@ void StorageBuffer::read(
*/
if (processed_stage > QueryProcessingStage::FetchColumns)
{
/// TODO: Find a way to support projections for StorageBuffer
auto interpreter = InterpreterSelectQuery(
query_info.query, local_context, std::move(pipe_from_buffers),
SelectQueryOptions(processed_stage));
SelectQueryOptions(processed_stage).ignoreProjections());
interpreter.buildQueryPlan(buffers_plan);
}
else

View File

@ -135,6 +135,10 @@ QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(
const StorageMetadataPtr &,
SelectQueryInfo & query_info) const
{
/// TODO: Find a way to support projections for StorageMaterializedView. Why do we use different
/// metadata for materialized view and target table? If they are the same, we can get rid of all
/// converting and use it just like a normal view.
query_info.ignore_projections = true;
return getTargetTable()->getQueryProcessingStage(local_context, to_stage, getTargetTable()->getInMemoryMetadataPtr(), query_info);
}

View File

@ -188,6 +188,8 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(
size_t selected_table_size = 0;
/// TODO: Find a way to support projections for StorageMerge
query_info.ignore_projections = true;
for (const auto & iterator : database_table_iterators)
{
while (iterator->isValid())
@ -471,7 +473,9 @@ Pipe StorageMerge::createSources(
modified_context->setSetting("max_threads", streams_num);
modified_context->setSetting("max_streams_to_max_threads_ratio", 1);
InterpreterSelectQuery interpreter{modified_query_info.query, modified_context, SelectQueryOptions(processed_stage)};
/// TODO: Find a way to support projections for StorageMerge
InterpreterSelectQuery interpreter{
modified_query_info.query, modified_context, SelectQueryOptions(processed_stage).ignoreProjections()};
pipe = QueryPipelineBuilder::getPipe(interpreter.buildQueryPipeline());

View File

@ -1,6 +1,7 @@
#pragma once
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <QueryPipeline/Pipe.h>
@ -37,6 +38,8 @@ public:
const StorageMetadataPtr &,
SelectQueryInfo & info) const override
{
/// TODO: Find a way to support projections for StorageProxy
info.ignore_projections = true;
return getNested()->getQueryProcessingStage(context, to_stage, getNested()->getInMemoryMetadataPtr(), info);
}

View File

@ -4492,7 +4492,7 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer
auto alter_lock_holder = lockForAlter(getSettings()->lock_acquire_timeout_for_background_operations);
LOG_INFO(log, "Metadata changed in ZooKeeper. Applying changes locally.");
auto metadata_diff = ReplicatedMergeTreeTableMetadata(*this, getInMemoryMetadataPtr()).checkAndFindDiff(metadata_from_entry);
auto metadata_diff = ReplicatedMergeTreeTableMetadata(*this, getInMemoryMetadataPtr()).checkAndFindDiff(metadata_from_entry, getInMemoryMetadataPtr()->getColumns(), getContext());
setTableStructure(std::move(columns_from_entry), metadata_diff);
metadata_version = entry.alter_version;

View File

@ -82,16 +82,28 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
const std::optional<FormatSettings> & format_settings,
ContextPtr context)
{
auto parsed_uri = Poco::URI(uri);
Poco::Net::HTTPBasicCredentials credentials;
std::string user_info = parsed_uri.getUserInfo();
if (!user_info.empty())
{
std::size_t n = user_info.find(':');
if (n != std::string::npos)
{
credentials.setUsername(user_info.substr(0, n));
credentials.setPassword(user_info.substr(n + 1));
}
}
auto read_buffer_creator = [&]()
{
auto parsed_uri = Poco::URI(uri);
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
parsed_uri,
Poco::Net::HTTPRequest::HTTP_GET,
nullptr,
ConnectionTimeouts::getHTTPTimeouts(context),
Poco::Net::HTTPBasicCredentials{},
credentials,
context->getSettingsRef().max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),

View File

@ -15,6 +15,9 @@
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
namespace DB
@ -370,4 +373,17 @@ TTLTableDescription TTLTableDescription::getTTLForTableFromAST(
return result;
}
TTLTableDescription TTLTableDescription::parse(const String & str, const ColumnsDescription & columns, ContextPtr context, const KeyDescription & primary_key)
{
TTLTableDescription result;
if (str.empty())
return result;
ParserTTLExpressionList parser;
ASTPtr ast = parseQuery(parser, str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
FunctionNameNormalizer().visit(ast.get());
return getTTLForTableFromAST(ast, columns, context, primary_key);
}
}

View File

@ -118,6 +118,9 @@ struct TTLTableDescription
static TTLTableDescription getTTLForTableFromAST(
const ASTPtr & definition_ast, const ColumnsDescription & columns, ContextPtr context, const KeyDescription & primary_key);
/// Parse description from string
static TTLTableDescription parse(const String & str, const ColumnsDescription & columns, ContextPtr context, const KeyDescription & primary_key);
};
}

View File

@ -4,6 +4,9 @@ endif()
if (TARGET ch_contrib::bzip2)
set(USE_BZIP2 1)
endif()
if (TARGET ch_contrib::minizip)
set(USE_MINIZIP 1)
endif()
if (TARGET ch_contrib::snappy)
set(USE_SNAPPY 1)
endif()

View File

@ -177,13 +177,13 @@ def test_insert_query_delimiter():
assert query("SELECT a FROM t ORDER BY a") == "1\n5\n234\n"
def test_insert_default_column():
query("CREATE TABLE t (a UInt8, b Int32 DEFAULT 100, c String DEFAULT 'c') ENGINE = Memory")
query("CREATE TABLE t (a UInt8, b Int32 DEFAULT 100 - a, c String DEFAULT 'c') ENGINE = Memory")
query("INSERT INTO t (c, a) VALUES ('x',1),('y',2)")
query("INSERT INTO t (a) FORMAT TabSeparated", input_data="3\n4\n")
assert query("SELECT * FROM t ORDER BY a") == "1\t100\tx\n" \
"2\t100\ty\n" \
"3\t100\tc\n" \
"4\t100\tc\n"
assert query("SELECT * FROM t ORDER BY a") == "1\t99\tx\n" \
"2\t98\ty\n" \
"3\t97\tc\n" \
"4\t96\tc\n"
def test_insert_splitted_row():
query("CREATE TABLE t (a UInt8) ENGINE = Memory")
@ -257,7 +257,7 @@ def test_progress():
}
]"""
def test_session():
def test_session_settings():
session_a = "session A"
session_b = "session B"
query("SET custom_x=1", session_id=session_a)
@ -267,9 +267,22 @@ def test_session():
assert query("SELECT getSetting('custom_x'), getSetting('custom_y')", session_id=session_a) == "1\t2\n"
assert query("SELECT getSetting('custom_x'), getSetting('custom_y')", session_id=session_b) == "3\t4\n"
def test_session_temp_tables():
session_a = "session A"
session_b = "session B"
query("CREATE TEMPORARY TABLE my_temp_table(a Int8)", session_id=session_a)
query("INSERT INTO my_temp_table VALUES (10)", session_id=session_a)
assert query("SELECT * FROM my_temp_table", session_id=session_a) == "10\n"
query("CREATE TEMPORARY TABLE my_temp_table(a Int8)", session_id=session_b)
query("INSERT INTO my_temp_table VALUES (20)", session_id=session_b)
assert query("SELECT * FROM my_temp_table", session_id=session_b) == "20\n"
assert query("SELECT * FROM my_temp_table", session_id=session_a) == "10\n"
def test_no_session():
e = query_and_get_error("SET custom_x=1")
assert "There is no session" in e.display_text
e = query_and_get_error("CREATE TEMPORARY TABLE my_temp_table(a Int8)")
assert "There is no session" in e.display_text
def test_input_function():
query("CREATE TABLE t (a UInt8) ENGINE = Memory")

View File

@ -94,9 +94,9 @@ def test_quota_from_users_xml():
system_quota_usage(
[["myQuota", "default", 31556952, 1, 1000, 1, 500, 0, 500, 0, "\\N", 50, "\\N", 200, "\\N", 50, 1000, 200, "\\N", "\\N"]])
instance.query("SELECT COUNT() from test_table")
instance.query("SELECT SUM(x) from test_table")
system_quota_usage(
[["myQuota", "default", 31556952, 2, 1000, 2, 500, 0, 500, 0, "\\N", 51, "\\N", 208, "\\N", 50, 1000, 200, "\\N", "\\N"]])
[["myQuota", "default", 31556952, 2, 1000, 2, 500, 0, 500, 0, "\\N", 51, "\\N", 208, "\\N", 100, 1000, 400, "\\N", "\\N"]])
def test_simpliest_quota():
@ -125,9 +125,9 @@ def test_tracking_quota():
system_quota_usage(
[["myQuota", "default", 31556952, 1, "\\N", 1, "\\N", 0, "\\N", 0, "\\N", 50, "\\N", 200, "\\N", 50, "\\N", 200, "\\N", "\\N"]])
instance.query("SELECT COUNT() from test_table")
instance.query("SELECT SUM(x) from test_table")
system_quota_usage(
[["myQuota", "default", 31556952, 2, "\\N", 2, "\\N", 0, "\\N", 0, "\\N", 51, "\\N", 208, "\\N", 50, "\\N", 200, "\\N", "\\N"]])
[["myQuota", "default", 31556952, 2, "\\N", 2, "\\N", 0, "\\N", 0, "\\N", 51, "\\N", 208, "\\N", 100, "\\N", 400, "\\N", "\\N"]])
def test_exceed_quota():

View File

@ -30,5 +30,10 @@
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_arrow>
<s3_parquet2>
<url>http://minio1:9001/root/test.parquet</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_parquet2>
</named_collections>
</clickhouse>

View File

@ -981,3 +981,16 @@ def test_format_detection(started_cluster):
result = instance.query(f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test.arrow')")
assert(int(result) == 1)
instance.query(f"create table parquet_table_s3 (x UInt64) engine=S3(s3_parquet2)")
instance.query(f"insert into parquet_table_s3 select 1")
result = instance.query(f"select * from s3(s3_parquet2)")
assert(int(result) == 1)
result = instance.query(f"select * from url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test.parquet')")
assert(int(result) == 1)
result = instance.query(f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test.parquet')")
assert(int(result) == 1)

View File

@ -118,6 +118,7 @@ SYSTEM THREAD FUZZER ['SYSTEM START THREAD FUZZER','SYSTEM STOP THREAD FUZZER','
SYSTEM [] \N ALL
dictGet ['dictHas','dictGetHierarchy','dictIsIn'] DICTIONARY ALL
addressToLine [] GLOBAL INTROSPECTION
addressToLineWithInlines [] GLOBAL INTROSPECTION
addressToSymbol [] GLOBAL INTROSPECTION
demangle [] GLOBAL INTROSPECTION
INTROSPECTION ['INTROSPECTION FUNCTIONS'] \N ALL

View File

@ -31,7 +31,7 @@ select count() from test_tuple where toDate(p) > '2020-09-01';
-- optimized
select count() from test_tuple where toDate(p) > '2020-09-01' and i = 1;
-- optimized
select count() from test_tuple where i > 1;
select count() from test_tuple where i > 2;
-- optimized
select count() from test_tuple where i < 1;
-- non-optimized

View File

@ -1,2 +1,2 @@
CREATE TABLE default.x\n(\n `i` Int32,\n INDEX mm rand() TYPE minmax GRANULARITY 1,\n INDEX nn rand() TYPE minmax GRANULARITY 1,\n PROJECTION p\n (\n SELECT max(i)\n ),\n PROJECTION p2\n (\n SELECT min(i)\n )\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/default/x\', \'r\')\nORDER BY i\nSETTINGS index_granularity = 8192
metadata format version: 1\ndate column: \nsampling expression: \nindex granularity: 8192\nmode: 0\nsign column: \nprimary key: i\ndata format version: 1\npartition key: \nindices: mm rand() TYPE minmax GRANULARITY 1, nn rand() TYPE minmax GRANULARITY 1\nprojections: p(SELECT max(i)), p2(SELECT min(i))\ngranularity bytes: 10485760\n
metadata format version: 1\ndate column: \nsampling expression: \nindex granularity: 8192\nmode: 0\nsign column: \nprimary key: i\ndata format version: 1\npartition key: \nindices: mm rand() TYPE minmax GRANULARITY 1, nn rand() TYPE minmax GRANULARITY 1\nprojections: p (SELECT max(i)), p2 (SELECT min(i))\ngranularity bytes: 10485760\n

View File

@ -13,7 +13,7 @@
1
1
1
\N 2021-10-27 10:00:00 4
2021-10-24 10:00:00
\N 2021-10-27 10:00:00 3
0
2021-10-24 10:00:00
0

View File

@ -53,7 +53,7 @@ select count() from d group by toDate(dt);
-- fuzz crash
SELECT pointInEllipses(min(j), NULL), max(dt), count('0.0000000007') FROM d WHERE toDate(dt) >= '2021-10-25';
SELECT min(dt) FROM d PREWHERE ceil(j) <= 0;
SELECT min(j) FROM d PREWHERE ceil(j) <= 0;
SELECT min(dt) FROM d PREWHERE ((0.9998999834060669 AND 1023) AND 255) <= ceil(j);
SELECT count('') AND NULL FROM d PREWHERE ceil(j) <= NULL;

View File

@ -0,0 +1,20 @@
276078600
291519000
304558200
317518200
330478200
276078600
291519000
304558200
317518200
330478200
276078600
291519000
304558200
317518200
330478200
276078600
291519000
304558200
317518200
330478200

View File

@ -0,0 +1,59 @@
DROP TABLE IF EXISTS normal;
CREATE TABLE normal
(
`key` UInt32,
`ts` DateTime,
`value` UInt32,
PROJECTION aaaa
(
SELECT
ts,
key,
value
ORDER BY ts, key
)
)
ENGINE = MergeTree
ORDER BY (key, ts);
INSERT INTO normal SELECT
1,
toDateTime('2021-12-06 00:00:00') + number,
number
FROM numbers(100000);
SET allow_experimental_projection_optimization=1, optimize_aggregation_in_order=1, force_optimize_projection=1;
WITH toStartOfHour(ts) AS a SELECT sum(value) v FROM normal WHERE ts > '2021-12-06 22:00:00' GROUP BY a ORDER BY v LIMIT 5;
WITH toStartOfHour(ts) AS a SELECT sum(value) v FROM normal WHERE ts > '2021-12-06 22:00:00' GROUP BY toStartOfHour(ts), a ORDER BY v LIMIT 5;
DROP TABLE IF EXISTS agg;
CREATE TABLE agg
(
`key` UInt32,
`ts` DateTime,
`value` UInt32,
PROJECTION aaaa
(
SELECT
ts,
key,
sum(value)
GROUP BY ts, key
)
)
ENGINE = MergeTree
ORDER BY (key, ts);
INSERT INTO agg SELECT
1,
toDateTime('2021-12-06 00:00:00') + number,
number
FROM numbers(100000);
SET allow_experimental_projection_optimization=1, optimize_aggregation_in_order=1, force_optimize_projection = 1;
WITH toStartOfHour(ts) AS a SELECT sum(value) v FROM normal WHERE ts > '2021-12-06 22:00:00' GROUP BY a ORDER BY v LIMIT 5;
WITH toStartOfHour(ts) AS a SELECT sum(value) v FROM normal WHERE ts > '2021-12-06 22:00:00' GROUP BY toStartOfHour(ts), a ORDER BY v LIMIT 5;

View File

@ -1,8 +1,21 @@
drop table if exists t;
create table t (s UInt16, l UInt16, projection p (select s, l order by l)) engine MergeTree order by s;
create table t (s UInt16, l UInt16, projection p (select s, l order by l)) engine MergeTree order by s;
select s from t join (select toUInt16(1) as s) x using (s) settings allow_experimental_projection_optimization = 1;
select s from t join (select toUInt16(1) as s) x using (s) settings allow_experimental_projection_optimization = 0;
drop table t;
drop table if exists mt;
create table mt (id1 Int8, id2 Int8) Engine=MergeTree order by tuple();
select id1 as alias1 from mt all inner join (select id2 as alias1 from mt) as t using (alias1) settings allow_experimental_projection_optimization = 1;
select id1 from mt all inner join (select id2 as id1 from mt) as t using (id1) settings allow_experimental_projection_optimization = 1;
select id2 as id1 from mt all inner join (select id1 from mt) as t using (id1) settings allow_experimental_projection_optimization = 1;
drop table mt;
drop table if exists j;
create table j (id1 Int8, id2 Int8, projection p (select id1, id2 order by id2)) Engine=MergeTree order by id1 settings index_granularity = 1;
insert into j select number, number from numbers(10);
select id1 as alias1 from j all inner join (select id2 as alias1 from j where id2 in (1, 2, 3)) as t using (alias1) where id2 in (2, 3, 4) settings allow_experimental_projection_optimization = 1;
drop table j;

View File

@ -25,8 +25,8 @@ drop table tbl;
drop table if exists XXXX;
create table XXXX (t Int64, f Float64) Engine=MergeTree order by t settings index_granularity=128;
insert into XXXX select number*60, 0 from numbers(100000);
SELECT count() FROM XXXX WHERE indexHint(t = 42);
128
SELECT sum(t) FROM XXXX WHERE indexHint(t = 42);
487680
drop table if exists XXXX;
create table XXXX (t Int64, f Float64) Engine=MergeTree order by t settings index_granularity=8192;
insert into XXXX select number*60, 0 from numbers(100000);

View File

@ -22,7 +22,7 @@ create table XXXX (t Int64, f Float64) Engine=MergeTree order by t settings inde
insert into XXXX select number*60, 0 from numbers(100000);
SELECT count() FROM XXXX WHERE indexHint(t = 42);
SELECT sum(t) FROM XXXX WHERE indexHint(t = 42);
drop table if exists XXXX;

View File

@ -14,6 +14,8 @@ select * from x where _partition_id in (select partitionId(number + 1) from numb
-- trivial count optimization test
set max_rows_to_read = 2; -- one row for subquery + subquery itself
-- TODO: Relax the limits because we might build prepared set twice with _minmax_count_projection
set max_rows_to_read = 3;
select count() from x where _partition_id in (select partitionId(number + 1) from numbers(1));
drop table x;

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,2 @@
10000000000
has inlines: 1

View File

@ -0,0 +1,25 @@
-- Tags: no-tsan, no-asan, no-ubsan, no-msan, no-debug
SELECT addressToLineWithInlines(1); -- { serverError 446 }
SET allow_introspection_functions = 1;
SET query_profiler_real_time_period_ns = 0;
SET query_profiler_cpu_time_period_ns = 1000000;
SET log_queries = 1;
SELECT count() FROM numbers_mt(10000000000) SETTINGS log_comment='02161_test_case';
SET log_queries = 0;
SET query_profiler_cpu_time_period_ns = 0;
SYSTEM FLUSH LOGS;
WITH
lineWithInlines AS
(
SELECT DISTINCT addressToLineWithInlines(arrayJoin(trace)) AS lineWithInlines FROM system.trace_log WHERE query_id =
(
SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND log_comment='02161_test_case' ORDER BY event_time DESC LIMIT 1
)
)
SELECT 'has inlines:', or(max(length(lineWithInlines)) > 1, max(locate(lineWithInlines[1], ':')) = 0) FROM lineWithInlines SETTINGS short_circuit_function_evaluation='enable';
-- `max(length(lineWithInlines)) > 1` check there is any inlines.
-- `max(locate(lineWithInlines[1], ':')) = 0` check whether none could get a symbol.