Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
Ivan Blinkov 2018-12-29 10:35:36 +03:00
commit 1d6fe1ba80
468 changed files with 5571 additions and 2426 deletions

2
.gitmodules vendored
View File

@ -36,7 +36,7 @@
url = https://github.com/ClickHouse-Extras/llvm
[submodule "contrib/mariadb-connector-c"]
path = contrib/mariadb-connector-c
url = https://github.com/MariaDB/mariadb-connector-c.git
url = https://github.com/ClickHouse-Extras/mariadb-connector-c.git
[submodule "contrib/jemalloc"]
path = contrib/jemalloc
url = https://github.com/jemalloc/jemalloc.git

View File

@ -25,11 +25,6 @@ endif ()
# Write compile_commands.json
set(CMAKE_EXPORT_COMPILE_COMMANDS 1)
set (MAX_COMPILER_MEMORY 2000 CACHE INTERNAL "")
set (MAX_LINKER_MEMORY 3500 CACHE INTERNAL "")
include (cmake/limit_jobs.cmake)
include (cmake/find_ccache.cmake)
if (NOT CMAKE_BUILD_TYPE OR CMAKE_BUILD_TYPE STREQUAL "None")

View File

@ -8,23 +8,24 @@ endif ()
if (NOT ZLIB_FOUND)
if (NOT MSVC)
set (INTERNAL_ZLIB_NAME "zlib-ng")
set (INTERNAL_ZLIB_NAME "zlib-ng" CACHE INTERNAL "")
else ()
set (INTERNAL_ZLIB_NAME "zlib")
set (INTERNAL_ZLIB_NAME "zlib" CACHE INTERNAL "")
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/${INTERNAL_ZLIB_NAME}")
message (WARNING "Will use standard zlib, please clone manually:\n git clone https://github.com/madler/zlib.git ${ClickHouse_SOURCE_DIR}/contrib/${INTERNAL_ZLIB_NAME}")
endif ()
endif ()
set (USE_INTERNAL_ZLIB_LIBRARY 1)
set (ZLIB_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/${INTERNAL_ZLIB_NAME}" "${ClickHouse_BINARY_DIR}/contrib/${INTERNAL_ZLIB_NAME}") # generated zconf.h
set (ZLIB_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/${INTERNAL_ZLIB_NAME}" "${ClickHouse_BINARY_DIR}/contrib/${INTERNAL_ZLIB_NAME}" CACHE INTERNAL "") # generated zconf.h
set (ZLIB_INCLUDE_DIRS ${ZLIB_INCLUDE_DIR}) # for poco
set (ZLIB_INCLUDE_DIRECTORIES ${ZLIB_INCLUDE_DIR}) # for protobuf
set (ZLIB_FOUND 1) # for poco
if (USE_STATIC_LIBRARIES)
set (ZLIB_LIBRARIES zlibstatic)
set (ZLIB_LIBRARIES zlibstatic CACHE INTERNAL "")
else ()
set (ZLIB_LIBRARIES zlib)
set (ZLIB_LIBRARIES zlib CACHE INTERNAL "")
endif ()
endif ()
message (STATUS "Using zlib: ${ZLIB_INCLUDE_DIR} : ${ZLIB_LIBRARIES}")
message (STATUS "Using ${INTERNAL_ZLIB_NAME}: ${ZLIB_INCLUDE_DIR} : ${ZLIB_LIBRARIES}")

View File

@ -6,30 +6,32 @@
cmake_host_system_information(RESULT AVAILABLE_PHYSICAL_MEMORY QUERY AVAILABLE_PHYSICAL_MEMORY) # Not available under freebsd
option(PARALLEL_COMPILE_JOBS "Define the maximum number of concurrent compilation jobs" "")
if (NOT PARALLEL_COMPILE_JOBS AND AVAILABLE_PHYSICAL_MEMORY)
math(EXPR PARALLEL_COMPILE_JOBS ${AVAILABLE_PHYSICAL_MEMORY}/2500) # ~2.5gb max per one compiler
if (NOT PARALLEL_COMPILE_JOBS AND AVAILABLE_PHYSICAL_MEMORY AND MAX_COMPILER_MEMORY)
math(EXPR PARALLEL_COMPILE_JOBS ${AVAILABLE_PHYSICAL_MEMORY}/${MAX_COMPILER_MEMORY})
if (NOT PARALLEL_COMPILE_JOBS)
set (PARALLEL_COMPILE_JOBS 1)
endif ()
endif ()
if (PARALLEL_COMPILE_JOBS)
set_property(GLOBAL APPEND PROPERTY JOB_POOLS compile_job_pool=${PARALLEL_COMPILE_JOBS})
set(CMAKE_JOB_POOL_COMPILE compile_job_pool)
set(CMAKE_JOB_POOL_COMPILE compile_job_pool${CMAKE_CURRENT_SOURCE_DIR})
string (REGEX REPLACE "[^a-zA-Z0-9]+" "_" CMAKE_JOB_POOL_COMPILE ${CMAKE_JOB_POOL_COMPILE})
set_property(GLOBAL APPEND PROPERTY JOB_POOLS ${CMAKE_JOB_POOL_COMPILE}=${PARALLEL_COMPILE_JOBS})
endif ()
option(PARALLEL_LINK_JOBS "Define the maximum number of concurrent link jobs" "")
if (NOT PARALLEL_LINK_JOBS AND AVAILABLE_PHYSICAL_MEMORY)
math(EXPR PARALLEL_LINK_JOBS ${AVAILABLE_PHYSICAL_MEMORY}/4000) # ~4gb max per one linker
if (NOT PARALLEL_LINK_JOBS AND AVAILABLE_PHYSICAL_MEMORY AND MAX_LINKER_MEMORY)
math(EXPR PARALLEL_LINK_JOBS ${AVAILABLE_PHYSICAL_MEMORY}/${MAX_LINKER_MEMORY})
if (NOT PARALLEL_LINK_JOBS)
set (PARALLEL_LINK_JOBS 1)
endif ()
endif ()
if (PARALLEL_COMPILE_JOBS OR PARALLEL_LINK_JOBS)
message(STATUS "Have ${AVAILABLE_PHYSICAL_MEMORY} megabytes of memory. Limiting concurrent linkers jobs to ${PARALLEL_LINK_JOBS} and compiler jobs to ${PARALLEL_COMPILE_JOBS}")
message(STATUS "${CMAKE_CURRENT_SOURCE_DIR}: Have ${AVAILABLE_PHYSICAL_MEMORY} megabytes of memory. Limiting concurrent linkers jobs to ${PARALLEL_LINK_JOBS} and compiler jobs to ${PARALLEL_COMPILE_JOBS}")
endif ()
if (LLVM_PARALLEL_LINK_JOBS)
set_property(GLOBAL APPEND PROPERTY JOB_POOLS link_job_pool=${PARALLEL_LINK_JOBS})
set(CMAKE_JOB_POOL_LINK link_job_pool)
set(CMAKE_JOB_POOL_LINK link_job_pool${CMAKE_CURRENT_SOURCE_DIR})
string (REGEX REPLACE "[^a-zA-Z0-9]+" "_" CMAKE_JOB_POOL_LINK ${CMAKE_JOB_POOL_LINK})
set_property(GLOBAL APPEND PROPERTY JOB_POOLS ${CMAKE_JOB_POOL_LINK}=${PARALLEL_LINK_JOBS})
endif ()

View File

@ -4,8 +4,8 @@ if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-unused-function -Wno-unused-variable -Wno-unused-but-set-variable -Wno-unused-result -Wno-deprecated-declarations -Wno-maybe-uninitialized -Wno-format -Wno-misleading-indentation -Wno-stringop-overflow")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-old-style-cast -Wno-unused-function -Wno-unused-variable -Wno-unused-but-set-variable -Wno-unused-result -Wno-deprecated-declarations -Wno-non-virtual-dtor -Wno-maybe-uninitialized -Wno-format -Wno-misleading-indentation -Wno-implicit-fallthrough -Wno-class-memaccess -Wno-sign-compare -std=c++1z")
elseif (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-unused-function -Wno-unused-variable -Wno-unused-result -Wno-deprecated-declarations -Wno-format -Wno-parentheses-equality")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-old-style-cast -Wno-unused-function -Wno-unused-variable -Wno-unused-result -Wno-deprecated-declarations -Wno-non-virtual-dtor -Wno-format -std=c++1z")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-unused-function -Wno-unused-variable -Wno-unused-result -Wno-deprecated-declarations -Wno-format -Wno-parentheses-equality -Wno-tautological-constant-compare -Wno-tautological-constant-out-of-range-compare -Wno-implicit-function-declaration -Wno-return-type -Wno-pointer-bool-conversion -Wno-enum-conversion -Wno-int-conversion -Wno-switch")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-old-style-cast -Wno-unused-function -Wno-unused-variable -Wno-unused-result -Wno-deprecated-declarations -Wno-non-virtual-dtor -Wno-format -Wno-inconsistent-missing-override -std=c++1z")
endif ()
if (USE_INTERNAL_BOOST_LIBRARY)
@ -206,6 +206,7 @@ if (USE_INTERNAL_HDFS3_LIBRARY)
if (USE_INTERNAL_PROTOBUF_LIBRARY)
set(protobuf_BUILD_TESTS OFF CACHE INTERNAL "" FORCE)
set(protobuf_BUILD_SHARED_LIBS OFF CACHE INTERNAL "" FORCE)
set(protobuf_WITH_ZLIB 0 CACHE INTERNAL "" FORCE) # actually will use zlib, but skip find
add_subdirectory(protobuf/cmake)
endif ()
add_subdirectory(libhdfs3-cmake)

@ -1 +1 @@
Subproject commit a0fd36cc5a5313414a5a2ebe9322577a29b4782a
Subproject commit d85d0e98999cd9e28ceb66645999b4a9ce85370e

View File

@ -2,15 +2,23 @@ if (USE_INCLUDE_WHAT_YOU_USE)
set (CMAKE_CXX_INCLUDE_WHAT_YOU_USE ${IWYU_PATH})
endif ()
include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/find_vectorclass.cmake)
set (MAX_COMPILER_MEMORY 2500 CACHE INTERNAL "")
if (MAKE_STATIC_LIBRARIES)
set (MAX_LINKER_MEMORY 3500 CACHE INTERNAL "")
else()
set (MAX_LINKER_MEMORY 2500 CACHE INTERNAL "")
endif ()
include (../cmake/limit_jobs.cmake)
include(cmake/find_vectorclass.cmake)
set (CONFIG_VERSION ${CMAKE_CURRENT_BINARY_DIR}/src/Common/config_version.h)
set (CONFIG_COMMON ${CMAKE_CURRENT_BINARY_DIR}/src/Common/config.h)
include (cmake/version.cmake)
message (STATUS "Will build ${VERSION_FULL}")
configure_file (${CMAKE_CURRENT_SOURCE_DIR}/src/Common/config.h.in ${CONFIG_COMMON})
configure_file (${CMAKE_CURRENT_SOURCE_DIR}/src/Common/config_version.h.in ${CONFIG_VERSION})
configure_file (src/Common/config.h.in ${CONFIG_COMMON})
configure_file (src/Common/config_version.h.in ${CONFIG_VERSION})
if (NOT MSVC)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wextra")
@ -53,7 +61,7 @@ add_subdirectory (src)
set(dbms_headers)
set(dbms_sources)
include(${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
include(../cmake/dbms_glob_sources.cmake)
add_headers_and_sources(clickhouse_common_io src/Common)
add_headers_and_sources(clickhouse_common_io src/Common/HashTable)
@ -142,10 +150,6 @@ if (CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE" OR CMAKE_BUILD_TYPE_UC STREQUAL "RELW
PROPERTIES COMPILE_FLAGS -g0)
endif ()
if (NOT ARCH_ARM AND CPUID_LIBRARY)
set (LINK_LIBRARIES_ONLY_ON_X86_64 ${CPUID_LIBRARY})
endif()
target_link_libraries (clickhouse_common_io
PUBLIC
common
@ -153,8 +157,6 @@ target_link_libraries (clickhouse_common_io
string_utils
widechar_width
${LINK_LIBRARIES_ONLY_ON_X86_64}
${LZ4_LIBRARY}
${ZSTD_LIBRARY}
${DOUBLE_CONVERSION_LIBRARIES}
pocoext
PUBLIC
@ -175,8 +177,13 @@ target_link_libraries (clickhouse_common_io
${CMAKE_DL_LIBS}
)
if (NOT ARCH_ARM AND CPUID_LIBRARY)
target_link_libraries (clickhouse_common_io PRIVATE ${CPUID_LIBRARY})
endif()
target_link_libraries (dbms
PRIVATE
clickhouse_compression
clickhouse_parsers
clickhouse_common_config
PUBLIC
@ -269,13 +276,6 @@ if (USE_HDFS)
target_include_directories (dbms SYSTEM BEFORE PRIVATE ${HDFS3_INCLUDE_DIR})
endif()
if (NOT USE_INTERNAL_LZ4_LIBRARY)
target_include_directories (dbms SYSTEM BEFORE PRIVATE ${LZ4_INCLUDE_DIR})
endif ()
if (NOT USE_INTERNAL_ZSTD_LIBRARY)
target_include_directories (dbms SYSTEM BEFORE PRIVATE ${ZSTD_INCLUDE_DIR})
endif ()
if (USE_JEMALLOC)
target_include_directories (dbms SYSTEM BEFORE PRIVATE ${JEMALLOC_INCLUDE_DIR}) # used in Interpreters/AsynchronousMetrics.cpp
endif ()

View File

@ -666,6 +666,12 @@ private:
const bool test_mode = config().has("testmode");
if (config().has("multiquery"))
{
{ /// disable logs if expects errors
TestHint test_hint(test_mode, text);
if (test_hint.clientError() || test_hint.serverError())
process("SET send_logs_level = 'none'");
}
/// Several queries separated by ';'.
/// INSERT data is ended by the end of line, not ';'.

View File

@ -5,6 +5,7 @@
#include <iostream>
#include <Core/Types.h>
#include <Common/Exception.h>
#include <Parsers/Lexer.h>
namespace DB
@ -27,25 +28,27 @@ public:
if (!enabled_)
return;
/// TODO: This is absolutely wrong. Fragment may be contained inside string literal.
size_t pos = query.find("--");
Lexer lexer(query.data(), query.data() + query.size());
if (pos != String::npos && query.find("--", pos + 2) != String::npos)
return; /// It's not last comment. Hint belongs to commented query. /// TODO Absolutely wrong: there maybe the following comment for the next query.
if (pos != String::npos)
for (Token token = lexer.nextToken(); !token.isEnd(); token = lexer.nextToken())
{
/// TODO: This is also wrong. Comment may already have ended by line break.
pos = query.find('{', pos + 2);
if (pos != String::npos)
if (token.type == TokenType::Comment)
{
String hint = query.substr(pos + 1);
String comment(token.begin, token.begin + token.size());
/// TODO: And this is wrong for the same reason.
pos = hint.find('}');
hint.resize(pos);
parse(hint);
if (!comment.empty())
{
size_t pos_start = comment.find('{', 0);
if (pos_start != String::npos)
{
size_t pos_end = comment.find('}', pos_start);
if (pos_end != String::npos)
{
String hint(comment.begin() + pos_start + 1, comment.begin() + pos_end);
parse(hint);
}
}
}
}
}
}

View File

@ -1,5 +1,5 @@
add_library (clickhouse-compressor-lib ${LINK_MODE} Compressor.cpp)
target_link_libraries (clickhouse-compressor-lib PRIVATE clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY})
target_link_libraries (clickhouse-compressor-lib PRIVATE clickhouse_compression clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY})
if (CLICKHOUSE_SPLIT_BINARY)
# Also in utils

View File

@ -1,21 +1,23 @@
#include <iostream>
#include <optional>
#include <boost/program_options.hpp>
#include <Common/Exception.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/CompressedWriteBuffer.h>
#include <IO/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Compression/CompressedReadBuffer.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <Compression/CompressionFactory.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_LARGE_SIZE_COMPRESSED;
extern const int BAD_ARGUMENTS;
}
}
@ -61,7 +63,8 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
("block-size,b", boost::program_options::value<unsigned>()->default_value(DBMS_DEFAULT_BUFFER_SIZE), "compress in blocks of specified size")
("hc", "use LZ4HC instead of LZ4")
("zstd", "use ZSTD instead of LZ4")
("level", boost::program_options::value<int>(), "compression level")
("codec", boost::program_options::value<std::vector<std::string>>()->multitoken(), "use codecs combination instead of LZ4")
("level", boost::program_options::value<std::vector<int>>()->multitoken(), "compression levels for codecs specified via --codec")
("none", "use no compression instead of LZ4")
("stat", "print block statistics of compressed data")
;
@ -84,19 +87,45 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
bool stat_mode = options.count("stat");
bool use_none = options.count("none");
unsigned block_size = options["block-size"].as<unsigned>();
std::vector<std::string> codecs;
if (options.count("codec"))
codecs = options["codec"].as<std::vector<std::string>>();
DB::CompressionMethod method = DB::CompressionMethod::LZ4;
if ((use_lz4hc || use_zstd || use_none) && !codecs.empty())
throw DB::Exception("Wrong options, codec flags like --zstd and --codec options are mutually exclusive", DB::ErrorCodes::BAD_ARGUMENTS);
std::string method_family = "LZ4";
if (use_lz4hc)
method = DB::CompressionMethod::LZ4HC;
method_family = "LZ4HC";
else if (use_zstd)
method = DB::CompressionMethod::ZSTD;
method_family = "ZSTD";
else if (use_none)
method = DB::CompressionMethod::NONE;
method_family = "NONE";
std::vector<int> levels;
if (options.count("level"))
levels = options["level"].as<std::vector<int>>();
DB::CompressionCodecPtr codec;
if (!codecs.empty())
{
if (levels.size() > codecs.size())
throw DB::Exception("Specified more levels than codecs", DB::ErrorCodes::BAD_ARGUMENTS);
std::vector<DB::CodecNameWithLevel> codec_names;
for (size_t i = 0; i < codecs.size(); ++i)
{
if (i < levels.size())
codec_names.emplace_back(codecs[i], levels[i]);
else
codec_names.emplace_back(codecs[i], std::nullopt);
}
codec = DB::CompressionCodecFactory::instance().get(codec_names);
}
else
codec = DB::CompressionCodecFactory::instance().get(method_family, levels.empty() ? std::nullopt : std::optional<int>(levels.back()));
DB::CompressionSettings settings(method, options.count("level")
? options["level"].as<int>()
: DB::CompressionSettings::getDefaultLevel(method));
DB::ReadBufferFromFileDescriptor rb(STDIN_FILENO);
DB::WriteBufferFromFileDescriptor wb(STDOUT_FILENO);
@ -115,7 +144,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
else
{
/// Compression
DB::CompressedWriteBuffer to(wb, settings, block_size);
DB::CompressedWriteBuffer to(wb, codec, block_size);
DB::copyData(rb, to);
}
}

View File

@ -0,0 +1,27 @@
## ClickHouse compressor
Simple program for data compression and decompression.
### Examples
Compress data with LZ4:
```
$ ./clickhouse-compressor < input_file > output_file
```
Decompress data from LZ4 format:
```
$ ./clickhouse-compressor --decompress < input_file > output_file
```
Compress data with ZSTD at level 5:
```
$ ./clickhouse-compressor --codec ZSTD --level 5 < input_file > output_file
```
Compress data with ZSTD level 10, LZ4HC level 7 and LZ4.
```
$ ./clickhouse-compressor --codec ZSTD --level 5 --codec LZ4HC --level 7 --codec LZ4 < input_file > output_file
```

View File

@ -12,3 +12,4 @@
#cmakedefine01 ENABLE_CLICKHOUSE_COMPRESSOR
#cmakedefine01 ENABLE_CLICKHOUSE_FORMAT
#cmakedefine01 ENABLE_CLICKHOUSE_OBFUSCATOR
#cmakedefine01 ENABLE_CLICKHOUSE_ODBC_BRIDGE

View File

@ -53,7 +53,7 @@ int mainEntryClickHouseFormat(int argc, char ** argv);
#if ENABLE_CLICKHOUSE_COPIER || !defined(ENABLE_CLICKHOUSE_COPIER)
int mainEntryClickHouseClusterCopier(int argc, char ** argv);
#endif
#if ENABLE_CLICKHOUSE_OBFUSCATOR
#if ENABLE_CLICKHOUSE_OBFUSCATOR || !defined(ENABLE_CLICKHOUSE_OBFUSCATOR)
int mainEntryClickHouseObfuscator(int argc, char ** argv);
#endif
#if ENABLE_CLICKHOUSE_ODBC_BRIDGE || !defined(ENABLE_CLICKHOUSE_ODBC_BRIDGE)
@ -102,7 +102,7 @@ std::pair<const char *, MainFunc> clickhouse_applications[] =
#if ENABLE_CLICKHOUSE_COPIER || !defined(ENABLE_CLICKHOUSE_COPIER)
{"copier", mainEntryClickHouseClusterCopier},
#endif
#if ENABLE_CLICKHOUSE_OBFUSCATOR
#if ENABLE_CLICKHOUSE_OBFUSCATOR || !defined(ENABLE_CLICKHOUSE_OBFUSCATOR)
{"obfuscator", mainEntryClickHouseObfuscator},
#endif
#if ENABLE_CLICKHOUSE_ODBC_BRIDGE || !defined(ENABLE_CLICKHOUSE_ODBC_BRIDGE)

View File

@ -19,8 +19,8 @@
#include <IO/ZlibInflatingReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/CompressedReadBuffer.h>
#include <IO/CompressedWriteBuffer.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteBufferFromFile.h>

View File

@ -6,7 +6,7 @@
#include <Common/HTMLForm.h>
#include <Common/setThreadName.h>
#include <IO/CompressedWriteBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <Interpreters/InterserverIOHandler.h>

View File

@ -12,13 +12,12 @@
#include <Common/setThreadName.h>
#include <Common/config_version.h>
#include <IO/Progress.h>
#include <IO/CompressedReadBuffer.h>
#include <IO/CompressedWriteBuffer.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/CompressionSettings.h>
#include <IO/copyData.h>
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
@ -32,6 +31,7 @@
#include <Core/ExternalTable.h>
#include <Storages/ColumnDefault.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Compression/CompressionFactory.h>
#include "TCPHandler.h"
@ -728,7 +728,7 @@ bool TCPHandler::receiveData()
{
NamesAndTypesList columns = block.getNamesAndTypesList();
storage = StorageMemory::create(external_table_name,
ColumnsDescription{columns, NamesAndTypesList{}, NamesAndTypesList{}, ColumnDefaults{}, ColumnComments{}});
ColumnsDescription{columns, NamesAndTypesList{}, NamesAndTypesList{}, ColumnDefaults{}, ColumnComments{}, ColumnCodecs{}});
storage->startup();
query_context.addExternalTable(external_table_name, storage);
}
@ -772,9 +772,14 @@ void TCPHandler::initBlockOutput(const Block & block)
{
if (!state.maybe_compressed_out)
{
std::string method = query_context.getSettingsRef().network_compression_method;
std::optional<int> level;
if (method == "ZSTD")
level = query_context.getSettingsRef().network_zstd_compression_level;
if (state.compression == Protocol::Compression::Enable)
state.maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(
*out, CompressionSettings(query_context.getSettingsRef()));
*out, CompressionCodecFactory::instance().get(method, level));
else
state.maybe_compressed_out = out;
}

View File

@ -85,7 +85,7 @@ public:
const ColumnArray & first_array_column = static_cast<const ColumnArray &>(*columns[0]);
const IColumn::Offsets & offsets = first_array_column.getOffsets();
size_t begin = row_num == 0 ? 0 : offsets[row_num - 1];
size_t begin = offsets[row_num - 1];
size_t end = offsets[row_num];
/// Sanity check. NOTE We can implement specialization for a case with single argument, if the check will hurt performance.

View File

@ -25,7 +25,7 @@ struct AggregateFunctionAvgData
UInt64 count = 0;
template <typename ResultT>
ResultT result() const
ResultT NO_SANITIZE_UNDEFINED result() const
{
if constexpr (std::is_floating_point_v<ResultT>)
if constexpr (std::numeric_limits<ResultT>::is_iec559)

View File

@ -96,7 +96,7 @@ private:
/** Calculates the slope of a line between leftmost and rightmost data points.
* (y2 - y1) / (x2 - x1)
*/
Float64 getBoundingRatio(const AggregateFunctionBoundingRatioData & data) const
Float64 NO_SANITIZE_UNDEFINED getBoundingRatio(const AggregateFunctionBoundingRatioData & data) const
{
if (data.empty)
return std::numeric_limits<Float64>::quiet_NaN();

View File

@ -146,7 +146,7 @@ public:
const ColumnArray & first_array_column = static_cast<const ColumnArray &>(*columns[0]);
const IColumn::Offsets & offsets = first_array_column.getOffsets();
size_t begin = row_num == 0 ? 0 : offsets[row_num - 1];
size_t begin = offsets[row_num - 1];
size_t end = offsets[row_num];
/// Sanity check. NOTE We can implement specialization for a case with single argument, if the check will hurt performance.

View File

@ -77,12 +77,14 @@ public:
if (!limit_num_elems)
{
cur_elems.value.insert(rhs_elems.value.begin(), rhs_elems.value.end(), arena);
if (rhs_elems.value.size())
cur_elems.value.insert(rhs_elems.value.begin(), rhs_elems.value.end(), arena);
}
else
{
UInt64 elems_to_insert = std::min(static_cast<size_t>(max_elems) - cur_elems.value.size(), rhs_elems.value.size());
cur_elems.value.insert(rhs_elems.value.begin(), rhs_elems.value.begin() + elems_to_insert, arena);
if (elems_to_insert)
cur_elems.value.insert(rhs_elems.value.begin(), rhs_elems.value.begin() + elems_to_insert, arena);
}
}
@ -119,10 +121,13 @@ public:
ColumnArray & arr_to = static_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + size);
offsets_to.push_back(offsets_to.back() + size);
typename ColumnVector<T>::Container & data_to = static_cast<ColumnVector<T> &>(arr_to.getData()).getData();
data_to.insert(this->data(place).value.begin(), this->data(place).value.end());
if (size)
{
typename ColumnVector<T>::Container & data_to = static_cast<ColumnVector<T> &>(arr_to.getData()).getData();
data_to.insert(this->data(place).value.begin(), this->data(place).value.end());
}
}
bool allocatesMemoryInArena() const override
@ -370,7 +375,7 @@ public:
auto & column_array = static_cast<ColumnArray &>(to);
auto & offsets = column_array.getOffsets();
offsets.push_back((offsets.size() == 0 ? 0 : offsets.back()) + data(place).elems);
offsets.push_back(offsets.back() + data(place).elems);
auto & column_data = column_array.getData();

View File

@ -83,7 +83,7 @@ public:
const typename State::Set & set = this->data(place).value;
size_t size = set.size();
offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + size);
offsets_to.push_back(offsets_to.back() + size);
typename ColumnVector<T>::Container & data_to = static_cast<ColumnVector<T> &>(arr_to.getData()).getData();
size_t old_size = data_to.size();
@ -195,7 +195,7 @@ public:
for (auto & rhs_elem : rhs_set)
{
cur_set.emplace(rhs_elem, it, inserted);
if (inserted)
if (inserted && it->size)
it->data = arena->insert(it->data, it->size);
}
}
@ -207,7 +207,7 @@ public:
IColumn & data_to = arr_to.getData();
auto & set = this->data(place).value;
offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + set.size());
offsets_to.push_back(offsets_to.back() + set.size());
for (auto & elem : set)
{

View File

@ -138,7 +138,7 @@ public:
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
size_t size = levels.size();
offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + size);
offsets_to.push_back(offsets_to.back() + size);
if (!size)
return;

View File

@ -68,12 +68,12 @@ struct VarMoments
readPODBinary(*this, buf);
}
T getPopulation() const
T NO_SANITIZE_UNDEFINED getPopulation() const
{
return (m2 - m1 * m1 / m0) / m0;
}
T getSample() const
T NO_SANITIZE_UNDEFINED getSample() const
{
if (m0 == 0)
return std::numeric_limits<T>::quiet_NaN();
@ -177,12 +177,12 @@ struct CovarMoments
readPODBinary(*this, buf);
}
T getPopulation() const
T NO_SANITIZE_UNDEFINED getPopulation() const
{
return (xy - x1 * y1 / m0) / m0;
}
T getSample() const
T NO_SANITIZE_UNDEFINED getSample() const
{
if (m0 == 0)
return std::numeric_limits<T>::quiet_NaN();
@ -232,7 +232,7 @@ struct CorrMoments
readPODBinary(*this, buf);
}
T get() const
T NO_SANITIZE_UNDEFINED get() const
{
return (m0 * xy - x1 * y1) / sqrt((m0 * x2 - x1 * x1) * (m0 * y2 - y1 * y1));
}

View File

@ -83,7 +83,7 @@ public:
const ColumnArray & array_column = static_cast<const ColumnArray &>(*columns[0]);
const IColumn::Offsets & offsets = array_column.getOffsets();
const auto & keys_vec = static_cast<const ColVecType &>(array_column.getData());
const size_t keys_vec_offset = row_num == 0 ? 0 : offsets[row_num - 1];
const size_t keys_vec_offset = offsets[row_num - 1];
const size_t keys_vec_size = (offsets[row_num] - keys_vec_offset);
// Columns 1..n contain arrays of numeric values to sum
@ -93,7 +93,7 @@ public:
Field value;
const ColumnArray & array_column = static_cast<const ColumnArray &>(*columns[col + 1]);
const IColumn::Offsets & offsets = array_column.getOffsets();
const size_t values_vec_offset = row_num == 0 ? 0 : offsets[row_num - 1];
const size_t values_vec_offset = offsets[row_num - 1];
const size_t values_vec_size = (offsets[row_num] - values_vec_offset);
// Expect key and value arrays to be of same length

View File

@ -93,7 +93,7 @@ public:
auto result_vec = set.topK(threshold);
size_t size = result_vec.size();
offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + size);
offsets_to.push_back(offsets_to.back() + size);
typename ColumnVector<T>::Container & data_to = static_cast<ColumnVector<T> &>(arr_to.getData()).getData();
size_t old_size = data_to.size();
@ -212,7 +212,7 @@ public:
IColumn & data_to = arr_to.getData();
auto result_vec = this->data(place).value.topK(threshold);
offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + result_vec.size());
offsets_to.push_back(offsets_to.back() + result_vec.size());
for (auto & elem : result_vec)
{

View File

@ -22,3 +22,7 @@ list(REMOVE_ITEM clickhouse_aggregate_functions_headers
add_library(clickhouse_aggregate_functions ${LINK_MODE} ${clickhouse_aggregate_functions_sources})
target_link_libraries(clickhouse_aggregate_functions PRIVATE dbms)
target_include_directories (clickhouse_aggregate_functions BEFORE PRIVATE ${COMMON_INCLUDE_DIR})
if (ENABLE_TESTS)
add_subdirectory (tests)
endif ()

View File

@ -28,8 +28,8 @@ static IAggregateFunction * createWithNumericType(const IDataType & argument_typ
if (which.idx == TypeIndex::TYPE) return new AggregateFunctionTemplate<TYPE>(std::forward<TArgs>(args)...);
FOR_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
if (which.idx == TypeIndex::Enum8) return new AggregateFunctionTemplate<UInt8>(std::forward<TArgs>(args)...);
if (which.idx == TypeIndex::Enum16) return new AggregateFunctionTemplate<UInt16>(std::forward<TArgs>(args)...);
if (which.idx == TypeIndex::Enum8) return new AggregateFunctionTemplate<Int8>(std::forward<TArgs>(args)...);
if (which.idx == TypeIndex::Enum16) return new AggregateFunctionTemplate<Int16>(std::forward<TArgs>(args)...);
return nullptr;
}
@ -41,8 +41,8 @@ static IAggregateFunction * createWithNumericType(const IDataType & argument_typ
if (which.idx == TypeIndex::TYPE) return new AggregateFunctionTemplate<TYPE, Data>(std::forward<TArgs>(args)...);
FOR_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
if (which.idx == TypeIndex::Enum8) return new AggregateFunctionTemplate<UInt8, Data>(std::forward<TArgs>(args)...);
if (which.idx == TypeIndex::Enum16) return new AggregateFunctionTemplate<UInt16, Data>(std::forward<TArgs>(args)...);
if (which.idx == TypeIndex::Enum8) return new AggregateFunctionTemplate<Int8, Data>(std::forward<TArgs>(args)...);
if (which.idx == TypeIndex::Enum16) return new AggregateFunctionTemplate<Int16, Data>(std::forward<TArgs>(args)...);
return nullptr;
}
@ -54,8 +54,8 @@ static IAggregateFunction * createWithNumericType(const IDataType & argument_typ
if (which.idx == TypeIndex::TYPE) return new AggregateFunctionTemplate<TYPE, Data<TYPE>>(std::forward<TArgs>(args)...);
FOR_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
if (which.idx == TypeIndex::Enum8) return new AggregateFunctionTemplate<UInt8, Data<UInt8>>(std::forward<TArgs>(args)...);
if (which.idx == TypeIndex::Enum16) return new AggregateFunctionTemplate<UInt16, Data<UInt16>>(std::forward<TArgs>(args)...);
if (which.idx == TypeIndex::Enum8) return new AggregateFunctionTemplate<Int8, Data<Int8>>(std::forward<TArgs>(args)...);
if (which.idx == TypeIndex::Enum16) return new AggregateFunctionTemplate<Int16, Data<Int16>>(std::forward<TArgs>(args)...);
return nullptr;
}
@ -106,8 +106,8 @@ static IAggregateFunction * createWithTwoNumericTypesSecond(const IDataType & se
if (which.idx == TypeIndex::TYPE) return new AggregateFunctionTemplate<FirstType, TYPE>(std::forward<TArgs>(args)...);
FOR_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
if (which.idx == TypeIndex::Enum8) return new AggregateFunctionTemplate<FirstType, UInt8>(std::forward<TArgs>(args)...);
if (which.idx == TypeIndex::Enum16) return new AggregateFunctionTemplate<FirstType, UInt16>(std::forward<TArgs>(args)...);
if (which.idx == TypeIndex::Enum8) return new AggregateFunctionTemplate<FirstType, Int8>(std::forward<TArgs>(args)...);
if (which.idx == TypeIndex::Enum16) return new AggregateFunctionTemplate<FirstType, Int16>(std::forward<TArgs>(args)...);
return nullptr;
}
@ -121,9 +121,9 @@ static IAggregateFunction * createWithTwoNumericTypes(const IDataType & first_ty
FOR_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
if (which.idx == TypeIndex::Enum8)
return createWithTwoNumericTypesSecond<UInt8, AggregateFunctionTemplate>(second_type, std::forward<TArgs>(args)...);
return createWithTwoNumericTypesSecond<Int8, AggregateFunctionTemplate>(second_type, std::forward<TArgs>(args)...);
if (which.idx == TypeIndex::Enum16)
return createWithTwoNumericTypesSecond<UInt16, AggregateFunctionTemplate>(second_type, std::forward<TArgs>(args)...);
return createWithTwoNumericTypesSecond<Int16, AggregateFunctionTemplate>(second_type, std::forward<TArgs>(args)...);
return nullptr;
}

View File

@ -225,6 +225,10 @@ public:
summary.resize(size);
buf.read(reinterpret_cast<char *>(summary.data()), size * sizeof(summary[0]));
count = 0;
for (const auto & c : summary)
count += c.count;
}
/** Calculates the quantile q [0, 1] based on the digest.

View File

@ -0,0 +1,2 @@
add_executable (quantile-t-digest quantile-t-digest.cpp)
target_link_libraries (quantile-t-digest PRIVATE dbms clickhouse_aggregate_functions)

View File

@ -0,0 +1,22 @@
#include <AggregateFunctions/QuantileTDigest.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadBufferFromString.h>
int main(int, char **)
{
using namespace DB;
QuantileTDigest<float> tdigest;
tdigest.add(1);
tdigest.add(2);
tdigest.add(3);
std::cout << tdigest.get(0.5) << "\n";
WriteBufferFromOwnString wb;
tdigest.serialize(wb);
QuantileTDigest<float> other;
ReadBufferFromString rb{wb.str()};
other.deserialize(rb);
std::cout << other.get(0.5) << "\n";
return 0;
}

View File

@ -13,3 +13,4 @@ add_subdirectory (AggregateFunctions)
add_subdirectory (Client)
add_subdirectory (TableFunctions)
add_subdirectory (Formats)
add_subdirectory (Compression)

View File

@ -2,8 +2,8 @@
#include <Poco/Net/NetException.h>
#include <Core/Defines.h>
#include <IO/CompressedReadBuffer.h>
#include <IO/CompressedWriteBuffer.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <IO/ReadHelpers.h>
@ -21,6 +21,7 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/config_version.h>
#include <Interpreters/ClientInfo.h>
#include <Compression/CompressionFactory.h>
#include <Common/config.h>
#if USE_POCO_NETSSL
@ -353,7 +354,19 @@ void Connection::sendQuery(
if (!connected)
connect();
compression_settings = settings ? CompressionSettings(*settings) : CompressionSettings(CompressionMethod::LZ4);
if (settings)
{
std::optional<int> level;
std::string method = settings->network_compression_method;
/// Bad custom logic
if (method == "ZSTD")
level = settings->network_zstd_compression_level;
compression_codec = CompressionCodecFactory::instance().get(method, level);
}
else
compression_codec = CompressionCodecFactory::instance().getDefaultCodec();
query_id = query_id_;
@ -426,7 +439,7 @@ void Connection::sendData(const Block & block, const String & name)
if (!block_out)
{
if (compression == Protocol::Compression::Enable)
maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(*out, compression_settings);
maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(*out, compression_codec);
else
maybe_compressed_out = out;

View File

@ -18,12 +18,13 @@
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/BlockStreamProfileInfo.h>
#include <IO/CompressionSettings.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/Settings.h>
#include <Interpreters/TablesStatus.h>
#include <Compression/ICompressionCodec.h>
#include <atomic>
#include <optional>
@ -205,7 +206,7 @@ private:
Protocol::Secure secure; /// Enable data encryption for communication.
/// What compression settings to use while sending data for INSERT queries and external tables.
CompressionSettings compression_settings;
CompressionCodecPtr compression_codec;
/** If not nullptr, used to limit network traffic.
* Only traffic for transferring blocks is accounted. Other packets don't.

View File

@ -378,7 +378,7 @@ const char * ColumnAggregateFunction::deserializeAndInsertFromArena(const char *
* as we cannot legally compare pointers after last element + 1 of some valid memory region.
* Probably this will not work under UBSan.
*/
ReadBufferFromMemory read_buffer(src_arena, std::numeric_limits<char *>::max() - src_arena);
ReadBufferFromMemory read_buffer(src_arena, std::numeric_limits<char *>::max() - src_arena - 1);
func->deserialize(data.back(), read_buffer, &dst_arena);
return read_buffer.position();

View File

@ -134,13 +134,13 @@ StringRef ColumnArray::getDataAt(size_t n) const
* since it contains only the data laid in succession, but not the offsets.
*/
size_t array_size = sizeAt(n);
if (array_size == 0)
return StringRef();
size_t offset_of_first_elem = offsetAt(n);
StringRef first = getData().getDataAtWithTerminatingZero(offset_of_first_elem);
size_t array_size = sizeAt(n);
if (array_size == 0)
return StringRef(first.data, 0);
size_t offset_of_last_elem = getOffsets()[n] - 1;
StringRef last = getData().getDataAtWithTerminatingZero(offset_of_last_elem);
@ -166,7 +166,7 @@ void ColumnArray::insertData(const char * pos, size_t length)
if (pos != end)
throw Exception("Incorrect length argument for method ColumnArray::insertData", ErrorCodes::BAD_ARGUMENTS);
getOffsets().push_back((getOffsets().size() == 0 ? 0 : getOffsets().back()) + elems);
getOffsets().push_back(getOffsets().back() + elems);
}
@ -194,7 +194,7 @@ const char * ColumnArray::deserializeAndInsertFromArena(const char * pos)
for (size_t i = 0; i < array_size; ++i)
pos = getData().deserializeAndInsertFromArena(pos);
getOffsets().push_back((getOffsets().size() == 0 ? 0 : getOffsets().back()) + array_size);
getOffsets().push_back(getOffsets().back() + array_size);
return pos;
}
@ -216,7 +216,7 @@ void ColumnArray::insert(const Field & x)
size_t size = array.size();
for (size_t i = 0; i < size; ++i)
getData().insert(array[i]);
getOffsets().push_back((getOffsets().size() == 0 ? 0 : getOffsets().back()) + size);
getOffsets().push_back(getOffsets().back() + size);
}
@ -227,13 +227,13 @@ void ColumnArray::insertFrom(const IColumn & src_, size_t n)
size_t offset = src.offsetAt(n);
getData().insertRangeFrom(src.getData(), offset, size);
getOffsets().push_back((getOffsets().size() == 0 ? 0 : getOffsets().back()) + size);
getOffsets().push_back(getOffsets().back() + size);
}
void ColumnArray::insertDefault()
{
getOffsets().push_back(getOffsets().size() == 0 ? 0 : getOffsets().back());
getOffsets().push_back(getOffsets().back());
}

View File

@ -124,8 +124,8 @@ private:
ColumnPtr data;
ColumnPtr offsets;
size_t ALWAYS_INLINE offsetAt(size_t i) const { return i == 0 ? 0 : getOffsets()[i - 1]; }
size_t ALWAYS_INLINE sizeAt(size_t i) const { return i == 0 ? getOffsets()[0] : (getOffsets()[i] - getOffsets()[i - 1]); }
size_t ALWAYS_INLINE offsetAt(ssize_t i) const { return getOffsets()[i - 1]; }
size_t ALWAYS_INLINE sizeAt(ssize_t i) const { return getOffsets()[i] - getOffsets()[i - 1]; }
/// Multiply values if the nested column is ColumnVector<T>.

View File

@ -3,6 +3,7 @@
#include <cmath>
#include <Columns/IColumn.h>
#include <Columns/ColumnVectorHelper.h>
namespace DB
@ -53,13 +54,13 @@ private:
/// A ColumnVector for Decimals
template <typename T>
class ColumnDecimal final : public COWPtrHelper<IColumn, ColumnDecimal<T>>
class ColumnDecimal final : public COWPtrHelper<ColumnVectorHelper, ColumnDecimal<T>>
{
static_assert(IsDecimalNumber<T>);
private:
using Self = ColumnDecimal;
friend class COWPtrHelper<IColumn, Self>;
friend class COWPtrHelper<ColumnVectorHelper, Self>;
public:
using Container = DecimalPaddedPODArray<T>;

View File

@ -1,9 +1,10 @@
#pragma once
#include <string.h> // memcpy
#include <string.h> // memcmp
#include <Common/PODArray.h>
#include <Columns/IColumn.h>
#include <Columns/ColumnVectorHelper.h>
namespace DB
@ -12,10 +13,10 @@ namespace DB
/** A column of values of "fixed-length string" type.
* If you insert a smaller string, it will be padded with zero bytes.
*/
class ColumnFixedString final : public COWPtrHelper<IColumn, ColumnFixedString>
class ColumnFixedString final : public COWPtrHelper<ColumnVectorHelper, ColumnFixedString>
{
public:
friend class COWPtrHelper<IColumn, ColumnFixedString>;
friend class COWPtrHelper<ColumnVectorHelper, ColumnFixedString>;
using Chars = PaddedPODArray<UInt8>;

View File

@ -236,6 +236,9 @@ void ColumnLowCardinality::gather(ColumnGathererStream & gatherer)
MutableColumnPtr ColumnLowCardinality::cloneResized(size_t size) const
{
auto unique_ptr = dictionary.getColumnUniquePtr();
if (size == 0)
unique_ptr = unique_ptr->cloneEmpty();
return ColumnLowCardinality::create((*std::move(unique_ptr)).mutate(), getIndexes().cloneResized(size));
}

View File

@ -148,7 +148,7 @@ ColumnPtr ColumnString::permute(const Permutation & perm, size_t limit) const
for (size_t i = 0; i < limit; ++i)
{
size_t j = perm[i];
size_t string_offset = j == 0 ? 0 : offsets[j - 1];
size_t string_offset = offsets[j - 1];
size_t string_size = offsets[j] - string_offset;
memcpySmallAllowReadWriteOverflow15(&res_chars[current_new_offset], &chars[string_offset], string_size);
@ -219,7 +219,7 @@ ColumnPtr ColumnString::indexImpl(const PaddedPODArray<Type> & indexes, size_t l
for (size_t i = 0; i < limit; ++i)
{
size_t j = indexes[i];
size_t string_offset = j == 0 ? 0 : offsets[j - 1];
size_t string_offset = offsets[j - 1];
size_t string_size = offsets[j] - string_offset;
memcpySmallAllowReadWriteOverflow15(&res_chars[current_new_offset], &chars[string_offset], string_size);

View File

@ -31,10 +31,10 @@ private:
/// For convenience, every string ends with terminating zero byte. Note that strings could contain zero bytes in the middle.
Chars chars;
size_t ALWAYS_INLINE offsetAt(size_t i) const { return i == 0 ? 0 : offsets[i - 1]; }
size_t ALWAYS_INLINE offsetAt(ssize_t i) const { return offsets[i - 1]; }
/// Size of i-th element, including terminating zero.
size_t ALWAYS_INLINE sizeAt(size_t i) const { return i == 0 ? offsets[0] : (offsets[i] - offsets[i - 1]); }
size_t ALWAYS_INLINE sizeAt(ssize_t i) const { return offsets[i] - offsets[i - 1]; }
template <bool positive>
struct less;
@ -153,7 +153,8 @@ public:
const size_t new_size = old_size + length + 1;
chars.resize(new_size);
memcpy(&chars[old_size], pos, length);
if (length)
memcpy(&chars[old_size], pos, length);
chars[old_size + length] = 0;
offsets.push_back(new_size);
}
@ -203,7 +204,7 @@ public:
void insertDefault() override
{
chars.push_back(0);
offsets.push_back(offsets.size() == 0 ? 1 : (offsets.back() + 1));
offsets.push_back(offsets.back() + 1);
}
int compareAt(size_t n, size_t m, const IColumn & rhs_, int /*nan_direction_hint*/) const override

View File

@ -2,6 +2,7 @@
#include <cmath>
#include <Columns/IColumn.h>
#include <Columns/ColumnVectorHelper.h>
#include <common/unaligned.h>
@ -86,47 +87,16 @@ template <> struct CompareHelper<Float32> : public FloatCompareHelper<Float32> {
template <> struct CompareHelper<Float64> : public FloatCompareHelper<Float64> {};
/** To implement `get64` function.
*/
template <typename T>
inline UInt64 unionCastToUInt64(T x) { return x; }
template <> inline UInt64 unionCastToUInt64(Float64 x)
{
union
{
Float64 src;
UInt64 res;
};
src = x;
return res;
}
template <> inline UInt64 unionCastToUInt64(Float32 x)
{
union
{
Float32 src;
UInt64 res;
};
res = 0;
src = x;
return res;
}
/** A template for columns that use a simple array to store.
*/
template <typename T>
class ColumnVector final : public COWPtrHelper<IColumn, ColumnVector<T>>
class ColumnVector final : public COWPtrHelper<ColumnVectorHelper, ColumnVector<T>>
{
static_assert(!IsDecimalNumber<T>);
private:
using Self = ColumnVector;
friend class COWPtrHelper<IColumn, Self>;
friend class COWPtrHelper<ColumnVectorHelper, Self>;
struct less;
struct greater;

View File

@ -0,0 +1,39 @@
#pragma once
#include <Columns/IColumn.h>
namespace DB
{
/** Allows to access internal array of ColumnVector or ColumnFixedString without cast to concrete type.
* We will inherit ColumnVector and ColumnFixedString from this class instead of IColumn.
* Assumes data layout of ColumnVector, ColumnFixedString and PODArray.
*
* Why it is needed?
*
* There are some algorithms that specialize on the size of data type but doesn't care about concrete type.
* The same specialization may work for UInt64, Int64, Float64, FixedString(8), if it only does byte moving and hashing.
* To avoid code bloat and compile time increase, we can use single template instantiation for these cases
* and just static_cast pointer to some single column type (e. g. ColumnUInt64) assuming that all types have identical memory layout.
*
* But this static_cast (downcast to unrelated type) is illegal according to the C++ standard and UBSan warns about it.
* To allow functional tests to work under UBSan we have to separate some base class that will present the memory layout in explicit way,
* and we will do static_cast to this class.
*/
class ColumnVectorHelper : public IColumn
{
public:
const char * getRawDataBegin() const
{
return *reinterpret_cast<const char * const *>(reinterpret_cast<const char *>(this) + sizeof(*this));
}
template <size_t ELEMENT_SIZE>
void insertRawData(const char * ptr)
{
return reinterpret_cast<PODArrayBase<ELEMENT_SIZE, 4096, Allocator<false>, 15, 16> *>(reinterpret_cast<char *>(this) + sizeof(*this))->push_back_raw(ptr);
}
};
}

View File

@ -107,7 +107,7 @@ public:
if (s != offsets.size())
throw Exception("Size of offsets doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
return cloneDummy(s == 0 ? 0 : offsets.back());
return cloneDummy(offsets.back());
}
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override

View File

@ -36,7 +36,7 @@ private:
static constexpr size_t pad_right = 15;
/// Contiguous chunk of memory and pointer to free space inside it. Member of single-linked list.
struct Chunk : private Allocator<false> /// empty base optimization
struct alignas(16) Chunk : private Allocator<false> /// empty base optimization
{
char * begin;
char * pos;
@ -149,6 +149,12 @@ public:
} while (true);
}
template <typename T>
T * alloc()
{
return reinterpret_cast<T *>(alignedAlloc(sizeof(T), alignof(T)));
}
/** Rollback just performed allocation.
* Must pass size not more that was just allocated.
*/

View File

@ -55,26 +55,6 @@ public:
return locus;
}
void readText(ReadBuffer & in)
{
for (size_t i = 0; i < BITSET_SIZE; ++i)
{
if (i != 0)
assertChar(',', in);
readIntText(bitset[i], in);
}
}
void writeText(WriteBuffer & out) const
{
for (size_t i = 0; i < BITSET_SIZE; ++i)
{
if (i != 0)
writeCString(",", out);
writeIntText(bitset[i], out);
}
}
private:
/// number of bytes in bitset
static constexpr size_t BITSET_SIZE = (static_cast<size_t>(bucket_count) * content_width + 7) / 8;
@ -165,7 +145,9 @@ private:
bool fits_in_byte;
};
/** The `Locus` structure contains the necessary information to find for each cell
/** TODO This code looks very suboptimal.
*
* The `Locus` structure contains the necessary information to find for each cell
* the corresponding byte and offset, in bits, from the beginning of the cell. Since in general
* case the size of one byte is not divisible by the size of one cell, cases possible
* when one cell overlaps two bytes. Therefore, the `Locus` structure contains two
@ -219,13 +201,20 @@ private:
void ALWAYS_INLINE init(BucketIndex bucket_index)
{
/// offset in bits to the leftmost bit
size_t l = static_cast<size_t>(bucket_index) * content_width;
index_l = l >> 3;
offset_l = l & 7;
size_t r = static_cast<size_t>(bucket_index + 1) * content_width;
index_r = r >> 3;
offset_r = r & 7;
/// offset of byte that contains the leftmost bit
index_l = l / 8;
/// offset in bits to the leftmost bit at that byte
offset_l = l % 8;
/// offset of byte that contains the rightmost bit
index_r = (l + content_width - 1) / 8;
/// offset in bits to the next to the rightmost bit at that byte; or zero if the rightmost bit is the rightmost bit in that byte.
offset_r = (l + content_width) % 8;
}
UInt8 ALWAYS_INLINE read(UInt8 value_l) const

View File

@ -447,6 +447,11 @@ XMLDocumentPtr ConfigProcessor::processConfig(
merge(config, with);
contributing_files.push_back(merge_file);
}
catch (Exception & e)
{
e.addMessage("while merging config '" + path + "' with '" + merge_file + "'");
throw;
}
catch (Poco::Exception & e)
{
throw Poco::Exception("Failed to merge config with '" + merge_file + "': " + e.displayText());
@ -479,6 +484,11 @@ XMLDocumentPtr ConfigProcessor::processConfig(
doIncludesRecursive(config, include_from, getRootNode(config.get()), zk_node_cache, zk_changed_event, contributing_zk_paths);
}
catch (Exception & e)
{
e.addMessage("while preprocessing config '" + path + "'");
throw;
}
catch (Poco::Exception & e)
{
throw Poco::Exception("Failed to preprocess config '" + path + "': " + e.displayText(), e);

View File

@ -81,7 +81,7 @@ void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallbac
std::lock_guard<std::mutex> lock(reload_mutex);
FilesChangesTracker new_files = getNewFileList();
if (force || new_files.isDifferOrNewerThan(files))
if (force || need_reload_from_zk || new_files.isDifferOrNewerThan(files))
{
ConfigProcessor config_processor(path);
ConfigProcessor::LoadedConfig loaded_config;
@ -94,6 +94,17 @@ void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallbac
loaded_config = config_processor.loadConfigWithZooKeeperIncludes(
zk_node_cache, zk_changed_event, fallback_to_preprocessed);
}
catch (const Coordination::Exception & e)
{
if (Coordination::isHardwareError(e.code))
need_reload_from_zk = true;
if (throw_on_error)
throw;
tryLogCurrentException(log, "ZooKeeper error when loading config from `" + path + "'");
return;
}
catch (...)
{
if (throw_on_error)
@ -110,7 +121,10 @@ void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallbac
* When file has been written (and contain valid data), we don't load new data since modification time remains the same.
*/
if (!loaded_config.loaded_from_preprocessed)
{
files = std::move(new_files);
need_reload_from_zk = false;
}
try
{

View File

@ -75,6 +75,7 @@ private:
std::string preprocessed_dir;
FilesChangesTracker files;
zkutil::ZooKeeperNodeCache zk_node_cache;
bool need_reload_from_zk = false;
zkutil::EventPtr zk_changed_event = std::make_shared<Poco::Event>();
Updater updater;

View File

@ -405,6 +405,9 @@ namespace ErrorCodes
extern const int UNKNOWN_LOG_LEVEL = 428;
extern const int FAILED_TO_GETPWUID = 429;
extern const int MISMATCHING_USERS_FOR_PROCESS_AND_DATA = 430;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE = 431;
extern const int UNKNOWN_CODEC = 432;
extern const int ILLEGAL_CODEC_PARAMETER = 433;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -0,0 +1,8 @@
#include <Common/PODArray.h>
namespace DB
{
/// Used for left padding of PODArray when empty
const char EmptyPODArray[EmptyPODArraySize]{};
}

View File

@ -20,6 +20,11 @@
namespace DB
{
inline constexpr size_t integerRoundUp(size_t value, size_t dividend)
{
return ((value + dividend - 1) / dividend) * dividend;
}
/** A dynamic array for POD types.
* Designed for a small number of large arrays (rather than a lot of small ones).
* To be more precise - for use in ColumnVector.
@ -37,6 +42,10 @@ namespace DB
* The template parameter `pad_right` - always allocate at the end of the array as many unused bytes.
* Can be used to make optimistic reading, writing, copying with unaligned SIMD instructions.
*
* The template parameter `pad_left` - always allocate memory before 0th element of the array (rounded up to the whole number of elements)
* and zero initialize -1th element. It allows to use -1th element that will have value 0.
* This gives performance benefits when converting an array of offsets to array of sizes.
*
* Some methods using allocator have TAllocatorParams variadic arguments.
* These arguments will be passed to corresponding methods of TAllocator.
* Example: pointer to Arena, that is used for allocations.
@ -49,30 +58,34 @@ namespace DB
* TODO Pass alignment to Allocator.
* TODO Allow greater alignment than alignof(T). Example: array of char aligned to page size.
*/
template <typename T, size_t INITIAL_SIZE = 4096, typename TAllocator = Allocator<false>, size_t pad_right_ = 0>
class PODArray : private boost::noncopyable, private TAllocator /// empty base optimization
static constexpr size_t EmptyPODArraySize = 1024;
extern const char EmptyPODArray[EmptyPODArraySize];
/** Base class that depend only on size of element, not on element itself.
* You can static_cast to this class if you want to insert some data regardless to the actual type T.
*/
template <size_t ELEMENT_SIZE, size_t INITIAL_SIZE, typename TAllocator, size_t pad_right_, size_t pad_left_>
class PODArrayBase : private boost::noncopyable, private TAllocator /// empty base optimization
{
protected:
/// Round padding up to an whole number of elements to simplify arithmetic.
static constexpr size_t pad_right = (pad_right_ + sizeof(T) - 1) / sizeof(T) * sizeof(T);
static constexpr size_t pad_right = integerRoundUp(pad_right_, ELEMENT_SIZE);
/// pad_left is also rounded up to 16 bytes to maintain alignment of allocated memory.
static constexpr size_t pad_left = integerRoundUp(integerRoundUp(pad_left_, ELEMENT_SIZE), 16);
/// Empty array will point to this static memory as padding.
static constexpr char * null = pad_left ? const_cast<char *>(EmptyPODArray) + EmptyPODArraySize : nullptr;
char * c_start = nullptr;
char * c_end = nullptr;
char * c_end_of_storage = nullptr; /// Does not include pad_right.
static_assert(pad_left <= EmptyPODArraySize && "Left Padding exceeds EmptyPODArraySize. Is the element size too large?");
T * t_start() { return reinterpret_cast<T *>(c_start); }
T * t_end() { return reinterpret_cast<T *>(c_end); }
T * t_end_of_storage() { return reinterpret_cast<T *>(c_end_of_storage); }
const T * t_start() const { return reinterpret_cast<const T *>(c_start); }
const T * t_end() const { return reinterpret_cast<const T *>(c_end); }
const T * t_end_of_storage() const { return reinterpret_cast<const T *>(c_end_of_storage); }
char * c_start = null; /// Does not include pad_left.
char * c_end = null;
char * c_end_of_storage = null; /// Does not include pad_right.
/// The amount of memory occupied by the num_elements of the elements.
static size_t byte_size(size_t num_elements) { return num_elements * sizeof(T); }
static size_t byte_size(size_t num_elements) { return num_elements * ELEMENT_SIZE; }
/// Minimum amount of memory to allocate for num_elements, including padding.
static size_t minimum_memory_for_elements(size_t num_elements) { return byte_size(num_elements) + pad_right; }
static size_t minimum_memory_for_elements(size_t num_elements) { return byte_size(num_elements) + pad_right + pad_left; }
void alloc_for_num_elements(size_t num_elements)
{
@ -82,22 +95,25 @@ protected:
template <typename ... TAllocatorParams>
void alloc(size_t bytes, TAllocatorParams &&... allocator_params)
{
c_start = c_end = reinterpret_cast<char *>(TAllocator::alloc(bytes, std::forward<TAllocatorParams>(allocator_params)...));
c_end_of_storage = c_start + bytes - pad_right;
c_start = c_end = reinterpret_cast<char *>(TAllocator::alloc(bytes, std::forward<TAllocatorParams>(allocator_params)...)) + pad_left;
c_end_of_storage = c_start + bytes - pad_right - pad_left;
if (pad_left)
memset(c_start - ELEMENT_SIZE, 0, ELEMENT_SIZE);
}
void dealloc()
{
if (c_start == nullptr)
if (c_start == null)
return;
TAllocator::free(c_start, allocated_bytes());
TAllocator::free(c_start - pad_left, allocated_bytes());
}
template <typename ... TAllocatorParams>
void realloc(size_t bytes, TAllocatorParams &&... allocator_params)
{
if (c_start == nullptr)
if (c_start == null)
{
alloc(bytes, std::forward<TAllocatorParams>(allocator_params)...);
return;
@ -105,15 +121,20 @@ protected:
ptrdiff_t end_diff = c_end - c_start;
c_start = reinterpret_cast<char *>(TAllocator::realloc(c_start, allocated_bytes(), bytes, std::forward<TAllocatorParams>(allocator_params)...));
c_start = reinterpret_cast<char *>(
TAllocator::realloc(c_start - pad_left, allocated_bytes(), bytes, std::forward<TAllocatorParams>(allocator_params)...))
+ pad_left;
c_end = c_start + end_diff;
c_end_of_storage = c_start + bytes - pad_right;
c_end_of_storage = c_start + bytes - pad_right - pad_left;
if (pad_left)
memset(c_start - ELEMENT_SIZE, 0, ELEMENT_SIZE);
}
bool isInitialized() const
{
return (c_start != nullptr) && (c_end != nullptr) && (c_end_of_storage != nullptr);
return (c_start != null) && (c_end != null) && (c_end_of_storage != null);
}
bool isAllocatedFromStack() const
@ -127,9 +148,9 @@ protected:
{
if (size() == 0)
{
// The allocated memory should be multiplication of sizeof(T) to hold the element, otherwise,
// The allocated memory should be multiplication of ELEMENT_SIZE to hold the element, otherwise,
// memory issue such as corruption could appear in edge case.
realloc(std::max(((INITIAL_SIZE - 1) / sizeof(T) + 1) * sizeof(T), minimum_memory_for_elements(1)),
realloc(std::max(((INITIAL_SIZE - 1) / ELEMENT_SIZE + 1) * ELEMENT_SIZE, minimum_memory_for_elements(1)),
std::forward<TAllocatorParams>(allocator_params)...);
}
else
@ -137,83 +158,13 @@ protected:
}
public:
using value_type = T;
bool empty() const { return c_end == c_start; }
size_t size() const { return (c_end - c_start) / ELEMENT_SIZE; }
size_t capacity() const { return (c_end_of_storage - c_start) / ELEMENT_SIZE; }
size_t allocated_bytes() const { return c_end_of_storage - c_start + pad_right; }
size_t allocated_bytes() const { return c_end_of_storage - c_start + pad_right + pad_left; }
/// You can not just use `typedef`, because there is ambiguity for the constructors and `assign` functions.
struct iterator : public boost::iterator_adaptor<iterator, T*>
{
iterator() {}
iterator(T * ptr_) : iterator::iterator_adaptor_(ptr_) {}
};
struct const_iterator : public boost::iterator_adaptor<const_iterator, const T*>
{
const_iterator() {}
const_iterator(const T * ptr_) : const_iterator::iterator_adaptor_(ptr_) {}
};
PODArray() {}
PODArray(size_t n)
{
alloc_for_num_elements(n);
c_end += byte_size(n);
}
PODArray(size_t n, const T & x)
{
alloc_for_num_elements(n);
assign(n, x);
}
PODArray(const_iterator from_begin, const_iterator from_end)
{
alloc_for_num_elements(from_end - from_begin);
insert(from_begin, from_end);
}
PODArray(std::initializer_list<T> il) : PODArray(std::begin(il), std::end(il)) {}
~PODArray()
{
dealloc();
}
PODArray(PODArray && other)
{
this->swap(other);
}
PODArray & operator=(PODArray && other)
{
this->swap(other);
return *this;
}
T * data() { return t_start(); }
const T * data() const { return t_start(); }
size_t size() const { return t_end() - t_start(); }
bool empty() const { return t_end() == t_start(); }
size_t capacity() const { return t_end_of_storage() - t_start(); }
T & operator[] (size_t n) { return t_start()[n]; }
const T & operator[] (size_t n) const { return t_start()[n]; }
T & front() { return t_start()[0]; }
T & back() { return t_end()[-1]; }
const T & front() const { return t_start()[0]; }
const T & back() const { return t_end()[-1]; }
iterator begin() { return t_start(); }
iterator end() { return t_end(); }
const_iterator begin() const { return t_start(); }
const_iterator end() const { return t_end(); }
const_iterator cbegin() const { return t_start(); }
const_iterator cend() const { return t_end(); }
void clear() { c_end = c_start; }
template <typename ... TAllocatorParams>
void reserve(size_t n, TAllocatorParams &&... allocator_params)
@ -234,42 +185,141 @@ public:
c_end = c_start + byte_size(n);
}
const char * raw_data() const
{
return c_start;
}
template <typename ... TAllocatorParams>
void push_back_raw(const char * ptr, TAllocatorParams &&... allocator_params)
{
if (unlikely(c_end == c_end_of_storage))
reserveForNextSize(std::forward<TAllocatorParams>(allocator_params)...);
memcpy(c_end, ptr, ELEMENT_SIZE);
c_end += byte_size(1);
}
~PODArrayBase()
{
dealloc();
}
};
template <typename T, size_t INITIAL_SIZE = 4096, typename TAllocator = Allocator<false>, size_t pad_right_ = 0, size_t pad_left_ = 0>
class PODArray : public PODArrayBase<sizeof(T), INITIAL_SIZE, TAllocator, pad_right_, pad_left_>
{
protected:
using Base = PODArrayBase<sizeof(T), INITIAL_SIZE, TAllocator, pad_right_, pad_left_>;
T * t_start() { return reinterpret_cast<T *>(this->c_start); }
T * t_end() { return reinterpret_cast<T *>(this->c_end); }
T * t_end_of_storage() { return reinterpret_cast<T *>(this->c_end_of_storage); }
const T * t_start() const { return reinterpret_cast<const T *>(this->c_start); }
const T * t_end() const { return reinterpret_cast<const T *>(this->c_end); }
const T * t_end_of_storage() const { return reinterpret_cast<const T *>(this->c_end_of_storage); }
public:
using value_type = T;
/// You can not just use `typedef`, because there is ambiguity for the constructors and `assign` functions.
struct iterator : public boost::iterator_adaptor<iterator, T*>
{
iterator() {}
iterator(T * ptr_) : iterator::iterator_adaptor_(ptr_) {}
};
struct const_iterator : public boost::iterator_adaptor<const_iterator, const T*>
{
const_iterator() {}
const_iterator(const T * ptr_) : const_iterator::iterator_adaptor_(ptr_) {}
};
PODArray() {}
PODArray(size_t n)
{
this->alloc_for_num_elements(n);
this->c_end += this->byte_size(n);
}
PODArray(size_t n, const T & x)
{
this->alloc_for_num_elements(n);
assign(n, x);
}
PODArray(const_iterator from_begin, const_iterator from_end)
{
this->alloc_for_num_elements(from_end - from_begin);
insert(from_begin, from_end);
}
PODArray(std::initializer_list<T> il) : PODArray(std::begin(il), std::end(il)) {}
PODArray(PODArray && other)
{
this->swap(other);
}
PODArray & operator=(PODArray && other)
{
this->swap(other);
return *this;
}
T * data() { return t_start(); }
const T * data() const { return t_start(); }
/// The index is signed to access -1th element without pointer overflow.
T & operator[] (ssize_t n) { return t_start()[n]; }
const T & operator[] (ssize_t n) const { return t_start()[n]; }
T & front() { return t_start()[0]; }
T & back() { return t_end()[-1]; }
const T & front() const { return t_start()[0]; }
const T & back() const { return t_end()[-1]; }
iterator begin() { return t_start(); }
iterator end() { return t_end(); }
const_iterator begin() const { return t_start(); }
const_iterator end() const { return t_end(); }
const_iterator cbegin() const { return t_start(); }
const_iterator cend() const { return t_end(); }
/// Same as resize, but zeroes new elements.
void resize_fill(size_t n)
{
size_t old_size = size();
size_t old_size = this->size();
if (n > old_size)
{
reserve(n);
memset(c_end, 0, byte_size(n - old_size));
this->reserve(n);
memset(this->c_end, 0, this->byte_size(n - old_size));
}
c_end = c_start + byte_size(n);
this->c_end = this->c_start + this->byte_size(n);
}
void resize_fill(size_t n, const T & value)
{
size_t old_size = size();
size_t old_size = this->size();
if (n > old_size)
{
reserve(n);
this->reserve(n);
std::fill(t_end(), t_end() + n - old_size, value);
}
c_end = c_start + byte_size(n);
}
void clear()
{
c_end = c_start;
this->c_end = this->c_start + this->byte_size(n);
}
template <typename ... TAllocatorParams>
void push_back(const T & x, TAllocatorParams &&... allocator_params)
{
if (unlikely(c_end == c_end_of_storage))
reserveForNextSize(std::forward<TAllocatorParams>(allocator_params)...);
if (unlikely(this->c_end == this->c_end_of_storage))
this->reserveForNextSize(std::forward<TAllocatorParams>(allocator_params)...);
*t_end() = x;
c_end += byte_size(1);
this->c_end += this->byte_size(1);
}
/** This method doesn't allow to pass parameters for Allocator,
@ -278,25 +328,25 @@ public:
template <typename... Args>
void emplace_back(Args &&... args)
{
if (unlikely(c_end == c_end_of_storage))
reserveForNextSize();
if (unlikely(this->c_end == this->c_end_of_storage))
this->reserveForNextSize();
new (t_end()) T(std::forward<Args>(args)...);
c_end += byte_size(1);
this->c_end += this->byte_size(1);
}
void pop_back()
{
c_end -= byte_size(1);
this->c_end -= this->byte_size(1);
}
/// Do not insert into the array a piece of itself. Because with the resize, the iterators on themselves can be invalidated.
template <typename It1, typename It2, typename ... TAllocatorParams>
void insertPrepare(It1 from_begin, It2 from_end, TAllocatorParams &&... allocator_params)
{
size_t required_capacity = size() + (from_end - from_begin);
if (required_capacity > capacity())
reserve(roundUpToPowerOfTwoOrZero(required_capacity), std::forward<TAllocatorParams>(allocator_params)...);
size_t required_capacity = this->size() + (from_end - from_begin);
if (required_capacity > this->capacity())
this->reserve(roundUpToPowerOfTwoOrZero(required_capacity), std::forward<TAllocatorParams>(allocator_params)...);
}
/// Do not insert into the array a piece of itself. Because with the resize, the iterators on themselves can be invalidated.
@ -313,9 +363,9 @@ public:
{
static_assert(pad_right_ >= 15);
insertPrepare(from_begin, from_end, std::forward<TAllocatorParams>(allocator_params)...);
size_t bytes_to_copy = byte_size(from_end - from_begin);
memcpySmallAllowReadWriteOverflow15(c_end, reinterpret_cast<const void *>(&*from_begin), bytes_to_copy);
c_end += bytes_to_copy;
size_t bytes_to_copy = this->byte_size(from_end - from_begin);
memcpySmallAllowReadWriteOverflow15(this->c_end, reinterpret_cast<const void *>(&*from_begin), bytes_to_copy);
this->c_end += bytes_to_copy;
}
template <typename It1, typename It2>
@ -323,22 +373,22 @@ public:
{
insertPrepare(from_begin, from_end);
size_t bytes_to_copy = byte_size(from_end - from_begin);
size_t bytes_to_copy = this->byte_size(from_end - from_begin);
size_t bytes_to_move = (end() - it) * sizeof(T);
if (unlikely(bytes_to_move))
memcpy(c_end + bytes_to_copy - bytes_to_move, c_end - bytes_to_move, bytes_to_move);
memcpy(this->c_end + bytes_to_copy - bytes_to_move, this->c_end - bytes_to_move, bytes_to_move);
memcpy(c_end - bytes_to_move, reinterpret_cast<const void *>(&*from_begin), bytes_to_copy);
c_end += bytes_to_copy;
memcpy(this->c_end - bytes_to_move, reinterpret_cast<const void *>(&*from_begin), bytes_to_copy);
this->c_end += bytes_to_copy;
}
template <typename It1, typename It2>
void insert_assume_reserved(It1 from_begin, It2 from_end)
{
size_t bytes_to_copy = byte_size(from_end - from_begin);
memcpy(c_end, reinterpret_cast<const void *>(&*from_begin), bytes_to_copy);
c_end += bytes_to_copy;
size_t bytes_to_copy = this->byte_size(from_end - from_begin);
memcpy(this->c_end, reinterpret_cast<const void *>(&*from_begin), bytes_to_copy);
this->c_end += bytes_to_copy;
}
void swap(PODArray & rhs)
@ -346,7 +396,7 @@ public:
/// Swap two PODArray objects, arr1 and arr2, that satisfy the following conditions:
/// - The elements of arr1 are stored on stack.
/// - The elements of arr2 are stored on heap.
auto swap_stack_heap = [](PODArray & arr1, PODArray & arr2)
auto swap_stack_heap = [this](PODArray & arr1, PODArray & arr2)
{
size_t stack_size = arr1.size();
size_t stack_allocated = arr1.allocated_bytes();
@ -360,27 +410,27 @@ public:
/// arr1 takes ownership of the heap memory of arr2.
arr1.c_start = arr2.c_start;
arr1.c_end_of_storage = arr1.c_start + heap_allocated - arr1.pad_right;
arr1.c_end = arr1.c_start + byte_size(heap_size);
arr1.c_end = arr1.c_start + this->byte_size(heap_size);
/// Allocate stack space for arr2.
arr2.alloc(stack_allocated);
/// Copy the stack content.
memcpy(arr2.c_start, stack_c_start, byte_size(stack_size));
arr2.c_end = arr2.c_start + byte_size(stack_size);
memcpy(arr2.c_start, stack_c_start, this->byte_size(stack_size));
arr2.c_end = arr2.c_start + this->byte_size(stack_size);
};
auto do_move = [](PODArray & src, PODArray & dest)
auto do_move = [this](PODArray & src, PODArray & dest)
{
if (src.isAllocatedFromStack())
{
dest.dealloc();
dest.alloc(src.allocated_bytes());
memcpy(dest.c_start, src.c_start, byte_size(src.size()));
memcpy(dest.c_start, src.c_start, this->byte_size(src.size()));
dest.c_end = dest.c_start + (src.c_end - src.c_start);
src.c_start = nullptr;
src.c_end = nullptr;
src.c_end_of_storage = nullptr;
src.c_start = Base::null;
src.c_end = Base::null;
src.c_end_of_storage = Base::null;
}
else
{
@ -390,28 +440,28 @@ public:
}
};
if (!isInitialized() && !rhs.isInitialized())
if (!this->isInitialized() && !rhs.isInitialized())
return;
else if (!isInitialized() && rhs.isInitialized())
else if (!this->isInitialized() && rhs.isInitialized())
{
do_move(rhs, *this);
return;
}
else if (isInitialized() && !rhs.isInitialized())
else if (this->isInitialized() && !rhs.isInitialized())
{
do_move(*this, rhs);
return;
}
if (isAllocatedFromStack() && rhs.isAllocatedFromStack())
if (this->isAllocatedFromStack() && rhs.isAllocatedFromStack())
{
size_t min_size = std::min(size(), rhs.size());
size_t max_size = std::max(size(), rhs.size());
size_t min_size = std::min(this->size(), rhs.size());
size_t max_size = std::max(this->size(), rhs.size());
for (size_t i = 0; i < min_size; ++i)
std::swap(this->operator[](i), rhs[i]);
if (size() == max_size)
if (this->size() == max_size)
{
for (size_t i = min_size; i < max_size; ++i)
rhs[i] = this->operator[](i);
@ -422,33 +472,33 @@ public:
this->operator[](i) = rhs[i];
}
size_t lhs_size = size();
size_t lhs_allocated = allocated_bytes();
size_t lhs_size = this->size();
size_t lhs_allocated = this->allocated_bytes();
size_t rhs_size = rhs.size();
size_t rhs_allocated = rhs.allocated_bytes();
c_end_of_storage = c_start + rhs_allocated - pad_right;
rhs.c_end_of_storage = rhs.c_start + lhs_allocated - pad_right;
this->c_end_of_storage = this->c_start + rhs_allocated - Base::pad_right;
rhs.c_end_of_storage = rhs.c_start + lhs_allocated - Base::pad_right;
c_end = c_start + byte_size(rhs_size);
rhs.c_end = rhs.c_start + byte_size(lhs_size);
this->c_end = this->c_start + this->byte_size(rhs_size);
rhs.c_end = rhs.c_start + this->byte_size(lhs_size);
}
else if (isAllocatedFromStack() && !rhs.isAllocatedFromStack())
else if (this->isAllocatedFromStack() && !rhs.isAllocatedFromStack())
swap_stack_heap(*this, rhs);
else if (!isAllocatedFromStack() && rhs.isAllocatedFromStack())
else if (!this->isAllocatedFromStack() && rhs.isAllocatedFromStack())
swap_stack_heap(rhs, *this);
else
{
std::swap(c_start, rhs.c_start);
std::swap(c_end, rhs.c_end);
std::swap(c_end_of_storage, rhs.c_end_of_storage);
std::swap(this->c_start, rhs.c_start);
std::swap(this->c_end, rhs.c_end);
std::swap(this->c_end_of_storage, rhs.c_end_of_storage);
}
}
void assign(size_t n, const T & x)
{
resize(n);
this->resize(n);
std::fill(begin(), end(), x);
}
@ -456,12 +506,12 @@ public:
void assign(It1 from_begin, It2 from_end)
{
size_t required_capacity = from_end - from_begin;
if (required_capacity > capacity())
reserve(roundUpToPowerOfTwoOrZero(required_capacity));
if (required_capacity > this->capacity())
this->reserve(roundUpToPowerOfTwoOrZero(required_capacity));
size_t bytes_to_copy = byte_size(required_capacity);
memcpy(c_start, reinterpret_cast<const void *>(&*from_begin), bytes_to_copy);
c_end = c_start + bytes_to_copy;
size_t bytes_to_copy = this->byte_size(required_capacity);
memcpy(this->c_start, reinterpret_cast<const void *>(&*from_begin), bytes_to_copy);
this->c_end = this->c_start + bytes_to_copy;
}
void assign(const PODArray & from)
@ -472,7 +522,7 @@ public:
bool operator== (const PODArray & other) const
{
if (size() != other.size())
if (this->size() != other.size())
return false;
const_iterator this_it = begin();
@ -504,15 +554,9 @@ void swap(PODArray<T, INITIAL_SIZE, TAllocator, pad_right_> & lhs, PODArray<T, I
/** For columns. Padding is enough to read and write xmm-register at the address of the last element. */
template <typename T, size_t INITIAL_SIZE = 4096, typename TAllocator = Allocator<false>>
using PaddedPODArray = PODArray<T, INITIAL_SIZE, TAllocator, 15>;
inline constexpr size_t integerRound(size_t value, size_t dividend)
{
return ((value + dividend - 1) / dividend) * dividend;
}
using PaddedPODArray = PODArray<T, INITIAL_SIZE, TAllocator, 15, 16>;
template <typename T, size_t stack_size_in_bytes>
using PODArrayWithStackMemory = PODArray<T, 0, AllocatorWithStackMemory<Allocator<false>, integerRound(stack_size_in_bytes, sizeof(T))>>;
using PODArrayWithStackMemory = PODArray<T, 0, AllocatorWithStackMemory<Allocator<false>, integerRoundUp(stack_size_in_bytes, sizeof(T))>>;
}

View File

@ -5,6 +5,7 @@
#include <Core/Types.h>
#include <Poco/UTF8Encoding.h>
#include <Poco/Unicode.h>
#include <common/unaligned.h>
#include <ext/range.h>
#include <stdint.h>
#include <string.h>
@ -121,9 +122,9 @@ protected:
CRTP & self() { return static_cast<CRTP &>(*this); }
const CRTP & self() const { return const_cast<VolnitskyBase *>(this)->self(); }
static const Ngram & toNGram(const UInt8 * const pos)
static Ngram toNGram(const UInt8 * const pos)
{
return *reinterpret_cast<const Ngram *>(pos);
return unalignedLoad<Ngram>(pos);
}
void putNGramBase(const Ngram ngram, const int offset)

View File

@ -20,23 +20,21 @@ ZooKeeperNodeCache::ZNode ZooKeeperNodeCache::get(const std::string & path, Even
ZooKeeperNodeCache::ZNode ZooKeeperNodeCache::get(const std::string & path, Coordination::WatchCallback caller_watch_callback)
{
zkutil::ZooKeeperPtr zookeeper;
std::unordered_set<std::string> invalidated_paths;
{
std::lock_guard<std::mutex> lock(context->mutex);
if (!context->zookeeper)
if (context->all_paths_invalidated)
{
/// Possibly, there was a previous session and it has expired. Clear the cache.
path_to_cached_znode.clear();
context->zookeeper = get_zookeeper();
context->all_paths_invalidated = false;
}
zookeeper = context->zookeeper;
invalidated_paths.swap(context->invalidated_paths);
}
zkutil::ZooKeeperPtr zookeeper = get_zookeeper();
if (!zookeeper)
throw DB::Exception("Could not get znode: `" + path + "'. ZooKeeper not configured.", DB::ErrorCodes::NO_ZOOKEEPER);
@ -65,8 +63,8 @@ ZooKeeperNodeCache::ZNode ZooKeeperNodeCache::get(const std::string & path, Coor
changed = owned_context->invalidated_paths.emplace(response.path).second;
else if (response.state == Coordination::EXPIRED_SESSION)
{
owned_context->zookeeper = nullptr;
owned_context->invalidated_paths.clear();
owned_context->all_paths_invalidated = true;
changed = true;
}
}

View File

@ -53,8 +53,8 @@ private:
struct Context
{
std::mutex mutex;
zkutil::ZooKeeperPtr zookeeper;
std::unordered_set<std::string> invalidated_paths;
bool all_paths_invalidated = false;
};
std::shared_ptr<Context> context;

View File

@ -3,10 +3,12 @@
#include <cstdint>
#include <limits>
#include <Core/Defines.h>
/// On overlow, the function returns unspecified value.
inline uint64_t intExp2(int x)
inline NO_SANITIZE_UNDEFINED uint64_t intExp2(int x)
{
return 1ULL << x;
}
@ -32,7 +34,8 @@ inline uint64_t intExp10(int x)
return table[x];
}
namespace common {
namespace common
{
inline int exp10_i32(int x)
{
@ -123,4 +126,4 @@ inline __int128 exp10_i128(int x)
return values[x];
}
} // common
}

View File

@ -1,5 +1,5 @@
add_executable (hashes_test hashes_test.cpp)
target_link_libraries (hashes_test PRIVATE dbms ${OPENSSL_CRYPTO_LIBRARY})
target_link_libraries (hashes_test PRIVATE clickhouse_common_io ${OPENSSL_CRYPTO_LIBRARY} ${CITYHASH_LIBRARIES})
add_executable (sip_hash sip_hash.cpp)
target_link_libraries (sip_hash PRIVATE clickhouse_common_io)
@ -20,10 +20,10 @@ add_executable (small_table small_table.cpp)
target_link_libraries (small_table PRIVATE clickhouse_common_io)
add_executable (parallel_aggregation parallel_aggregation.cpp)
target_link_libraries (parallel_aggregation PRIVATE clickhouse_common_io)
target_link_libraries (parallel_aggregation PRIVATE clickhouse_compression clickhouse_common_io)
add_executable (parallel_aggregation2 parallel_aggregation2.cpp)
target_link_libraries (parallel_aggregation2 PRIVATE clickhouse_common_io)
target_link_libraries (parallel_aggregation2 PRIVATE clickhouse_compression clickhouse_common_io)
add_executable (int_hashes_perf int_hashes_perf.cpp AvalancheTest.cpp Random.cpp)
target_link_libraries (int_hashes_perf PRIVATE clickhouse_common_io)
@ -42,7 +42,7 @@ add_executable (shell_command_test shell_command_test.cpp)
target_link_libraries (shell_command_test PRIVATE clickhouse_common_io)
add_executable (arena_with_free_lists arena_with_free_lists.cpp)
target_link_libraries (arena_with_free_lists PRIVATE clickhouse_common_io)
target_link_libraries (arena_with_free_lists PRIVATE clickhouse_compression clickhouse_common_io)
add_executable (pod_array pod_array.cpp)
target_link_libraries (pod_array PRIVATE clickhouse_common_io)
@ -61,7 +61,7 @@ target_link_libraries (space_saving PRIVATE clickhouse_common_io)
add_executable (integer_hash_tables_and_hashes integer_hash_tables_and_hashes.cpp)
target_include_directories (integer_hash_tables_and_hashes SYSTEM BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
target_link_libraries (integer_hash_tables_and_hashes PRIVATE clickhouse_common_io)
target_link_libraries (integer_hash_tables_and_hashes PRIVATE clickhouse_compression clickhouse_common_io)
add_executable (allocator allocator.cpp)
target_link_libraries (allocator PRIVATE clickhouse_common_io)

View File

@ -20,7 +20,7 @@
#include <Core/Field.h>
#include <Common/Stopwatch.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/CompressedReadBuffer.h>
#include <Compression/CompressedReadBuffer.h>
#include <IO/ReadHelpers.h>
using namespace DB;

View File

@ -14,7 +14,7 @@
#include <Core/Types.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/CompressedReadBuffer.h>
#include <Compression/CompressedReadBuffer.h>
#include <Common/HashTable/HashMap.h>
#include <Common/SipHash.h>

View File

@ -13,7 +13,7 @@
//#include <Common/HashTable/HashTableMerge.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/CompressedReadBuffer.h>
#include <Compression/CompressedReadBuffer.h>
#include <Common/Stopwatch.h>
#include <common/ThreadPool.h>

View File

@ -13,7 +13,7 @@
//#include <Common/HashTable/HashTableMerge.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/CompressedReadBuffer.h>
#include <Compression/CompressedReadBuffer.h>
#include <Common/Stopwatch.h>
#include <common/ThreadPool.h>

View File

@ -0,0 +1,16 @@
include(${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
add_headers_and_sources(clickhouse_compression .)
add_library(clickhouse_compression ${LINK_MODE} ${clickhouse_compression_headers} ${clickhouse_compression_sources})
target_link_libraries(clickhouse_compression PRIVATE clickhouse_parsers clickhouse_common_io ${ZSTD_LIBRARY} ${LZ4_LIBRARY})
target_include_directories(clickhouse_compression PUBLIC ${DBMS_INCLUDE_DIR})
if (NOT USE_INTERNAL_LZ4_LIBRARY)
target_include_directories(clickhouse_compression SYSTEM BEFORE PRIVATE ${LZ4_INCLUDE_DIR})
endif ()
if (NOT USE_INTERNAL_ZSTD_LIBRARY)
target_include_directories(clickhouse_compression SYSTEM BEFORE PRIVATE ${ZSTD_INCLUDE_DIR})
endif ()
if(ENABLE_TESTS)
add_subdirectory(tests)
endif()

View File

@ -1,8 +1,9 @@
#include "CachedCompressedReadBuffer.h"
#include <IO/createReadBufferFromFileBase.h>
#include <IO/CachedCompressedReadBuffer.h>
#include <IO/WriteHelpers.h>
#include <IO/CompressedStream.h>
#include <IO/LZ4_decompress_faster.h>
#include <Compression/CompressionInfo.h>
#include <Compression/LZ4_decompress_faster.h>
namespace DB
@ -30,7 +31,6 @@ void CachedCompressedReadBuffer::initInput()
bool CachedCompressedReadBuffer::nextImpl()
{
/// Let's check for the presence of a decompressed block in the cache, grab the ownership of this block, if it exists.
UInt128 key = cache->hash(path, file_pos);
owned_cell = cache->get(key);
@ -42,14 +42,15 @@ bool CachedCompressedReadBuffer::nextImpl()
owned_cell = std::make_shared<UncompressedCacheCell>();
size_t size_decompressed;
size_t size_compressed_without_checksum;
owned_cell->compressed_size = readCompressedData(size_decompressed, size_compressed_without_checksum);
if (owned_cell->compressed_size)
{
owned_cell->data.resize(size_decompressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
decompress(owned_cell->data.data(), size_decompressed, size_compressed_without_checksum);
owned_cell->data.resize(size_decompressed + codec->getAdditionalSizeAtTheEndOfBuffer());
decompress(owned_cell->data.data(), size_decompressed, owned_cell->compressed_size);
/// Put data into cache.
cache->set(key, owned_cell);
@ -62,7 +63,7 @@ bool CachedCompressedReadBuffer::nextImpl()
return false;
}
working_buffer = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
working_buffer = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - codec->getAdditionalSizeAtTheEndOfBuffer());
file_pos += owned_cell->compressed_size;

View File

@ -3,7 +3,7 @@
#include <memory>
#include <time.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/CompressedReadBufferBase.h>
#include "CompressedReadBufferBase.h"
#include <IO/UncompressedCache.h>
#include <port/clock.h>

View File

@ -1,6 +1,6 @@
#include <IO/CompressedReadBuffer.h>
#include <IO/CompressedStream.h>
#include <IO/LZ4_decompress_faster.h>
#include "CompressedReadBuffer.h"
#include <Compression/CompressionInfo.h>
#include <Compression/LZ4_decompress_faster.h>
namespace DB
@ -14,7 +14,7 @@ bool CompressedReadBuffer::nextImpl()
if (!size_compressed)
return false;
memory.resize(size_decompressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
memory.resize(size_decompressed + codec->getAdditionalSizeAtTheEndOfBuffer());
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum);
@ -40,7 +40,7 @@ size_t CompressedReadBuffer::readBig(char * to, size_t n)
return bytes_read;
/// If the decompressed block is placed entirely where it needs to be copied.
if (size_decompressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER <= n - bytes_read)
if (size_decompressed + codec->getAdditionalSizeAtTheEndOfBuffer() <= n - bytes_read)
{
decompress(to + bytes_read, size_decompressed, size_compressed_without_checksum);
bytes_read += size_decompressed;
@ -49,7 +49,7 @@ size_t CompressedReadBuffer::readBig(char * to, size_t n)
else
{
bytes += offset();
memory.resize(size_decompressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
memory.resize(size_decompressed + codec->getAdditionalSizeAtTheEndOfBuffer());
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
pos = working_buffer.begin();

View File

@ -1,6 +1,6 @@
#pragma once
#include <IO/CompressedReadBufferBase.h>
#include "CompressedReadBufferBase.h"
#include <IO/BufferWithOwnMemory.h>
#include <IO/ReadBuffer.h>

View File

@ -0,0 +1,140 @@
#include "CompressedReadBufferBase.h"
#include <vector>
#include <string.h>
#include <city.h>
#include <zstd.h>
#include <Common/PODArray.h>
#include <Common/ProfileEvents.h>
#include <Common/Exception.h>
#include <Common/hex.h>
#include <common/unaligned.h>
#include <Compression/ICompressionCodec.h>
#include <Compression/CompressionFactory.h>
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <Compression/CompressionInfo.h>
#include <IO/WriteHelpers.h>
namespace ProfileEvents
{
extern const Event ReadCompressedBytes;
extern const Event CompressedReadBufferBlocks;
extern const Event CompressedReadBufferBytes;
}
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_COMPRESSION_METHOD;
extern const int TOO_LARGE_SIZE_COMPRESSED;
extern const int CHECKSUM_DOESNT_MATCH;
extern const int CANNOT_DECOMPRESS;
}
static constexpr auto CHECKSUM_SIZE{sizeof(CityHash_v1_0_2::uint128)};
/// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need.
/// Returns number of compressed bytes read.
size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum)
{
if (compressed_in->eof())
return 0;
CityHash_v1_0_2::uint128 checksum;
compressed_in->readStrict(reinterpret_cast<char *>(&checksum), CHECKSUM_SIZE);
UInt8 header_size = ICompressionCodec::getHeaderSize();
own_compressed_buffer.resize(header_size);
compressed_in->readStrict(own_compressed_buffer.data(), header_size);
UInt8 method = ICompressionCodec::readMethod(own_compressed_buffer.data());
if (!codec)
codec = CompressionCodecFactory::instance().get(method);
else if (method != codec->getMethodByte())
throw Exception("Data compressed with different methods, given method byte "
+ getHexUIntLowercase(method)
+ ", previous method byte "
+ getHexUIntLowercase(codec->getMethodByte()),
ErrorCodes::CANNOT_DECOMPRESS);
size_compressed_without_checksum = ICompressionCodec::readCompressedBlockSize(own_compressed_buffer.data());
size_decompressed = ICompressionCodec::readDecompressedBlockSize(own_compressed_buffer.data());
if (size_compressed_without_checksum > DBMS_MAX_COMPRESSED_SIZE)
throw Exception("Too large size_compressed_without_checksum: "
+ toString(size_compressed_without_checksum)
+ ". Most likely corrupted data.",
ErrorCodes::TOO_LARGE_SIZE_COMPRESSED);
ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed_without_checksum + CHECKSUM_SIZE);
/// Is whole compressed block located in 'compressed_in->' buffer?
if (compressed_in->offset() >= header_size &&
compressed_in->position() + size_compressed_without_checksum + codec->getAdditionalSizeAtTheEndOfBuffer() - header_size <= compressed_in->buffer().end())
{
compressed_in->position() -= header_size;
compressed_buffer = compressed_in->position();
compressed_in->position() += size_compressed_without_checksum;
}
else
{
own_compressed_buffer.resize(size_compressed_without_checksum + codec->getAdditionalSizeAtTheEndOfBuffer());
compressed_buffer = own_compressed_buffer.data();
compressed_in->readStrict(compressed_buffer + header_size, size_compressed_without_checksum - header_size);
}
if (!disable_checksum)
{
auto checksum_calculated = CityHash_v1_0_2::CityHash128(compressed_buffer, size_compressed_without_checksum);
if (checksum != checksum_calculated)
throw Exception("Checksum doesn't match: corrupted data."
" Reference: " + getHexUIntLowercase(checksum.first) + getHexUIntLowercase(checksum.second)
+ ". Actual: " + getHexUIntLowercase(checksum_calculated.first) + getHexUIntLowercase(checksum_calculated.second)
+ ". Size of compressed block: " + toString(size_compressed_without_checksum),
ErrorCodes::CHECKSUM_DOESNT_MATCH);
}
return size_compressed_without_checksum + CHECKSUM_SIZE;
}
void CompressedReadBufferBase::decompress(char * to, size_t size_decompressed, size_t size_compressed_without_checksum)
{
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks);
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed);
UInt8 method = ICompressionCodec::readMethod(compressed_buffer);
if (!codec)
codec = CompressionCodecFactory::instance().get(method);
else if (codec->getMethodByte() != method)
throw Exception("Data compressed with different methods, given method byte "
+ getHexUIntLowercase(method)
+ ", previous method byte "
+ getHexUIntLowercase(codec->getMethodByte()),
ErrorCodes::CANNOT_DECOMPRESS);
codec->decompress(compressed_buffer, size_compressed_without_checksum, to);
}
/// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.
CompressedReadBufferBase::CompressedReadBufferBase(ReadBuffer * in)
: compressed_in(in), own_compressed_buffer(0)
{
}
CompressedReadBufferBase::~CompressedReadBufferBase() = default; /// Proper destruction of unique_ptr of forward-declared type.
}

View File

@ -1,7 +1,8 @@
#pragma once
#include <Common/PODArray.h>
#include <IO/LZ4_decompress_faster.h>
#include <Compression/LZ4_decompress_faster.h>
#include <Compression/ICompressionCodec.h>
namespace DB
@ -25,9 +26,6 @@ protected:
/// Don't checksum on decompressing.
bool disable_checksum = false;
LZ4::PerformanceStatistics lz4_stat;
/// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need.
/// Returns number of compressed bytes read.
size_t readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum);
@ -47,6 +45,9 @@ public:
{
disable_checksum = true;
}
public:
CompressionCodecPtr codec;
};
}

View File

@ -1,8 +1,9 @@
#include <IO/CompressedReadBufferFromFile.h>
#include "CompressedReadBufferFromFile.h"
#include <IO/createReadBufferFromFileBase.h>
#include <IO/WriteHelpers.h>
#include <IO/CompressedStream.h>
#include <IO/LZ4_decompress_faster.h>
#include <Compression/CompressionInfo.h>
#include <Compression/LZ4_decompress_faster.h>
namespace DB

View File

@ -1,6 +1,6 @@
#pragma once
#include <IO/CompressedReadBufferBase.h>
#include "CompressedReadBufferBase.h"
#include <IO/ReadBufferFromFileBase.h>
#include <time.h>
#include <memory>

View File

@ -0,0 +1,62 @@
#include <memory>
#include <city.h>
#include <lz4.h>
#include <lz4hc.h>
#include <zstd.h>
#include <string.h>
#include <common/unaligned.h>
#include <Core/Types.h>
#include "CompressedWriteBuffer.h"
#include <Compression/CompressionFactory.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int UNKNOWN_COMPRESSION_METHOD;
}
static constexpr auto CHECKSUM_SIZE{sizeof(CityHash_v1_0_2::uint128)};
void CompressedWriteBuffer::nextImpl()
{
if (!offset())
return;
size_t decompressed_size = offset();
UInt32 compressed_reserve_size = codec->getCompressedReserveSize(decompressed_size);
compressed_buffer.resize(compressed_reserve_size);
UInt32 compressed_size = codec->compress(working_buffer.begin(), decompressed_size, compressed_buffer.data());
CityHash_v1_0_2::uint128 checksum = CityHash_v1_0_2::CityHash128(compressed_buffer.data(), compressed_size);
out.write(reinterpret_cast<const char *>(&checksum), CHECKSUM_SIZE);
out.write(compressed_buffer.data(), compressed_size);
}
CompressedWriteBuffer::CompressedWriteBuffer(
WriteBuffer & out_,
CompressionCodecPtr codec_,
size_t buf_size)
: BufferWithOwnMemory<WriteBuffer>(buf_size), out(out_), codec(codec_)
{
}
CompressedWriteBuffer::~CompressedWriteBuffer()
{
try
{
next();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}

View File

@ -6,7 +6,8 @@
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/CompressionSettings.h>
#include <Compression/ICompressionCodec.h>
#include <Compression/CompressionFactory.h>
namespace DB
@ -16,7 +17,7 @@ class CompressedWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
{
private:
WriteBuffer & out;
CompressionSettings compression_settings;
CompressionCodecPtr codec;
PODArray<char> compressed_buffer;
@ -25,7 +26,7 @@ private:
public:
CompressedWriteBuffer(
WriteBuffer & out_,
CompressionSettings compression_settings = CompressionSettings(),
CompressionCodecPtr codec_ = CompressionCodecFactory::instance().getDefaultCodec(),
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
/// The amount of compressed data

View File

@ -0,0 +1,100 @@
#include "CompressionCodecLZ4.h"
#include <lz4.h>
#include <lz4hc.h>
#include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h>
#include <Compression/LZ4_decompress_faster.h>
#include "CompressionCodecLZ4.h"
#include <Parsers/IAST.h>
#include <Parsers/ASTLiteral.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
}
UInt8 CompressionCodecLZ4::getMethodByte() const
{
return static_cast<UInt8>(CompressionMethodByte::LZ4);
}
String CompressionCodecLZ4::getCodecDesc() const
{
return "LZ4";
}
UInt32 CompressionCodecLZ4::getCompressedDataSize(UInt32 uncompressed_size) const
{
return LZ4_COMPRESSBOUND(uncompressed_size);
}
UInt32 CompressionCodecLZ4::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
return LZ4_compress_default(source, dest, source_size, LZ4_COMPRESSBOUND(source_size));
}
void CompressionCodecLZ4::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
{
LZ4::decompress(source, dest, source_size, uncompressed_size, lz4_stat);
}
void registerCodecLZ4(CompressionCodecFactory & factory)
{
factory.registerSimpleCompressionCodec("LZ4", static_cast<UInt8>(CompressionMethodByte::LZ4), [&] ()
{
return std::make_shared<CompressionCodecLZ4>();
});
}
String CompressionCodecLZ4HC::getCodecDesc() const
{
return "LZ4HC";
}
UInt32 CompressionCodecLZ4HC::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
auto success = LZ4_compress_HC(source, dest, source_size, LZ4_COMPRESSBOUND(source_size), level);
if (!success)
throw Exception("Cannot LZ4_compress_HC", ErrorCodes::CANNOT_COMPRESS);
return success;
}
void registerCodecLZ4HC(CompressionCodecFactory & factory)
{
factory.registerCompressionCodec("LZ4HC", {}, [&](const ASTPtr & arguments) -> CompressionCodecPtr
{
int level = 0;
if (arguments && !arguments->children.empty())
{
if (arguments->children.size() > 1)
throw Exception("LZ4HC codec must have 1 parameter, given " + std::to_string(arguments->children.size()), ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE);
const auto children = arguments->children;
const ASTLiteral * literal = static_cast<const ASTLiteral *>(children[0].get());
level = literal->value.safeGet<UInt64>();
}
return std::make_shared<CompressionCodecLZ4HC>(level);
});
}
CompressionCodecLZ4HC::CompressionCodecLZ4HC(int level_)
: level(level_)
{
}
}

View File

@ -0,0 +1,47 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <Compression/ICompressionCodec.h>
#include <IO/BufferWithOwnMemory.h>
#include <Parsers/StringRange.h>
#include <Compression/LZ4_decompress_faster.h>
namespace DB
{
class CompressionCodecLZ4 : public ICompressionCodec
{
public:
UInt8 getMethodByte() const override;
String getCodecDesc() const override;
UInt32 getAdditionalSizeAtTheEndOfBuffer() const override { return LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER; }
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
private:
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getCompressedDataSize(UInt32 uncompressed_size) const override;
mutable LZ4::PerformanceStatistics lz4_stat;
};
class CompressionCodecLZ4HC : public CompressionCodecLZ4
{
public:
CompressionCodecLZ4HC(int level_);
String getCodecDesc() const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
private:
int level;
};
}

View File

@ -0,0 +1,115 @@
#include <Compression/CompressionCodecMultiple.h>
#include <Compression/CompressionInfo.h>
#include <common/unaligned.h>
#include <Compression/CompressionFactory.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/hex.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_CODEC;
extern const int CORRUPTED_DATA;
}
CompressionCodecMultiple::CompressionCodecMultiple(Codecs codecs)
: codecs(codecs)
{
for (size_t idx = 0; idx < codecs.size(); idx++)
{
if (idx != 0)
codec_desc = codec_desc + ',';
const auto codec = codecs[idx];
codec_desc = codec_desc + codec->getCodecDesc();
}
}
UInt8 CompressionCodecMultiple::getMethodByte() const
{
return static_cast<UInt8>(CompressionMethodByte::Multiple);
}
String CompressionCodecMultiple::getCodecDesc() const
{
return codec_desc;
}
UInt32 CompressionCodecMultiple::getCompressedDataSize(UInt32 uncompressed_size) const
{
UInt32 compressed_size = uncompressed_size;
for (auto & codec : codecs)
compressed_size = codec->getCompressedReserveSize(compressed_size);
/// TotalCodecs ByteForEachCodec data
return sizeof(UInt8) + codecs.size() + compressed_size;
}
UInt32 CompressionCodecMultiple::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
PODArray<char> compressed_buf;
PODArray<char> uncompressed_buf(source, source + source_size);
dest[0] = static_cast<UInt8>(codecs.size());
size_t codecs_byte_pos = 1;
for (size_t idx = 0; idx < codecs.size(); ++idx, ++codecs_byte_pos)
{
const auto codec = codecs[idx];
dest[codecs_byte_pos] = codec->getMethodByte();
compressed_buf.resize(codec->getCompressedReserveSize(source_size));
UInt32 size_compressed = codec->compress(uncompressed_buf.data(), source_size, compressed_buf.data());
uncompressed_buf.swap(compressed_buf);
source_size = size_compressed;
}
memcpy(&dest[1 + codecs.size()], uncompressed_buf.data(), source_size);
return 1 + codecs.size() + source_size;
}
void CompressionCodecMultiple::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 decompressed_size) const
{
UInt8 compression_methods_size = source[0];
PODArray<char> compressed_buf(&source[compression_methods_size + 1], &source[source_size]);
PODArray<char> uncompressed_buf;
/// Insert all data into compressed buf
source_size -= (compression_methods_size + 1);
for (long idx = compression_methods_size - 1; idx >= 0; --idx)
{
UInt8 compression_method = source[idx + 1];
const auto codec = CompressionCodecFactory::instance().get(compression_method);
compressed_buf.resize(compressed_buf.size() + codec->getAdditionalSizeAtTheEndOfBuffer());
UInt32 uncompressed_size = ICompressionCodec::readDecompressedBlockSize(compressed_buf.data());
if (idx == 0 && uncompressed_size != decompressed_size)
throw Exception("Wrong final decompressed size in codec Multiple, got " + toString(uncompressed_size) + ", expected " + toString(decompressed_size), ErrorCodes::CORRUPTED_DATA);
uncompressed_buf.resize(uncompressed_size + codec->getAdditionalSizeAtTheEndOfBuffer());
codec->decompress(compressed_buf.data(), source_size, uncompressed_buf.data());
uncompressed_buf.swap(compressed_buf);
source_size = uncompressed_size;
}
memcpy(dest, compressed_buf.data(), decompressed_size);
}
void registerCodecMultiple(CompressionCodecFactory & factory)
{
factory.registerSimpleCompressionCodec("Multiple", static_cast<UInt8>(CompressionMethodByte::Multiple), [&] ()
{
return std::make_shared<CompressionCodecMultiple>();
});
}
}

View File

@ -0,0 +1,31 @@
#pragma once
#include <Compression/ICompressionCodec.h>
namespace DB
{
class CompressionCodecMultiple final : public ICompressionCodec
{
public:
CompressionCodecMultiple() = default;
explicit CompressionCodecMultiple(Codecs codecs);
UInt8 getMethodByte() const override;
String getCodecDesc() const override;
UInt32 getCompressedDataSize(UInt32 uncompressed_size) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
private:
Codecs codecs;
String codec_desc;
};
}

View File

@ -0,0 +1,38 @@
#include <Compression/CompressionCodecNone.h>
#include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h>
namespace DB
{
UInt8 CompressionCodecNone::getMethodByte() const
{
return static_cast<UInt8>(CompressionMethodByte::NONE);
}
String CompressionCodecNone::getCodecDesc() const
{
return "NONE";
}
UInt32 CompressionCodecNone::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
memcpy(dest, source, source_size);
return source_size;
}
void CompressionCodecNone::doDecompressData(const char * source, UInt32 /*source_size*/, char * dest, UInt32 uncompressed_size) const
{
memcpy(dest, source, uncompressed_size);
}
void registerCodecNone(CompressionCodecFactory & factory)
{
factory.registerSimpleCompressionCodec("NONE", static_cast<char>(CompressionMethodByte::NONE), [&] ()
{
return std::make_shared<CompressionCodecNone>();
});
}
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <Compression/ICompressionCodec.h>
#include <IO/BufferWithOwnMemory.h>
#include <Parsers/StringRange.h>
namespace DB
{
class CompressionCodecNone : public ICompressionCodec
{
public:
UInt8 getMethodByte() const override;
String getCodecDesc() const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
};
}

View File

@ -0,0 +1,86 @@
#include <Compression/CompressionCodecZSTD.h>
#include <Compression/CompressionInfo.h>
#include <IO/ReadHelpers.h>
#include <Compression/CompressionFactory.h>
#include <zstd.h>
#include <Core/Field.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTLiteral.h>
#include <Common/typeid_cast.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
}
UInt8 CompressionCodecZSTD::getMethodByte() const
{
return static_cast<UInt8>(CompressionMethodByte::ZSTD);
}
String CompressionCodecZSTD::getCodecDesc() const
{
return "ZSTD";
}
UInt32 CompressionCodecZSTD::getCompressedDataSize(UInt32 uncompressed_size) const
{
return ZSTD_compressBound(uncompressed_size);
}
UInt32 CompressionCodecZSTD::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
size_t compressed_size = ZSTD_compress(dest, ZSTD_compressBound(source_size), source, source_size, level);
if (ZSTD_isError(compressed_size))
throw Exception("Cannot compress block with ZSTD: " + std::string(ZSTD_getErrorName(compressed_size)), ErrorCodes::CANNOT_COMPRESS);
return compressed_size;
}
void CompressionCodecZSTD::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const
{
size_t res = ZSTD_decompress(dest, uncompressed_size, source, source_size);
if (ZSTD_isError(res))
throw Exception("Cannot ZSTD_decompress: " + std::string(ZSTD_getErrorName(res)), ErrorCodes::CANNOT_DECOMPRESS);
}
CompressionCodecZSTD::CompressionCodecZSTD(int level_)
:level(level_)
{
}
void registerCodecZSTD(CompressionCodecFactory & factory)
{
UInt8 method_code = static_cast<char>(CompressionMethodByte::ZSTD);
factory.registerCompressionCodec("ZSTD", method_code, [&](const ASTPtr & arguments) -> CompressionCodecPtr
{
int level = CompressionCodecZSTD::ZSTD_DEFAULT_LEVEL;
if (arguments && !arguments->children.empty())
{
if (arguments->children.size() > 1)
throw Exception("ZSTD codec must have 1 parameter, given " + std::to_string(arguments->children.size()), ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE);
const auto children = arguments->children;
const ASTLiteral * literal = static_cast<const ASTLiteral *>(children[0].get());
level = literal->value.safeGet<UInt64>();
if (level > ZSTD_maxCLevel())
throw Exception("ZSTD codec can't have level more that " + toString(ZSTD_maxCLevel()) + ", given " + toString(level), ErrorCodes::ILLEGAL_CODEC_PARAMETER);
}
return std::make_shared<CompressionCodecZSTD>(level);
});
}
}

View File

@ -0,0 +1,33 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <Compression/ICompressionCodec.h>
#include <IO/BufferWithOwnMemory.h>
#include <Parsers/StringRange.h>
namespace DB
{
class CompressionCodecZSTD : public ICompressionCodec
{
public:
static constexpr auto ZSTD_DEFAULT_LEVEL = 1;
CompressionCodecZSTD(int level_);
UInt8 getMethodByte() const override;
String getCodecDesc() const override;
UInt32 getCompressedDataSize(UInt32 uncompressed_size) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
private:
int level;
};
}

View File

@ -0,0 +1,151 @@
#include <Compression/CompressionFactory.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Common/typeid_cast.h>
#include <Poco/String.h>
#include <IO/ReadBuffer.h>
#include <Parsers/queryToString.h>
#include <Compression/CompressionCodecMultiple.h>
#include <Compression/CompressionCodecLZ4.h>
#include <Compression/CompressionCodecNone.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_CODEC;
extern const int UNEXPECTED_AST_STRUCTURE;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int DATA_TYPE_CANNOT_HAVE_ARGUMENTS;
}
CompressionCodecPtr CompressionCodecFactory::getDefaultCodec() const
{
return default_codec;
}
CompressionCodecPtr CompressionCodecFactory::get(const String & family_name, std::optional<int> level) const
{
if (level)
{
auto literal = std::make_shared<ASTLiteral>(static_cast<UInt64>(*level));
return get(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), literal)));
}
else
{
auto identifier = std::make_shared<ASTIdentifier>(Poco::toUpper(family_name));
return get(makeASTFunction("CODEC", identifier));
}
}
CompressionCodecPtr CompressionCodecFactory::get(const std::vector<CodecNameWithLevel> & codecs) const
{
Codecs result;
for (const auto & [codec_name, level] : codecs)
result.push_back(get(codec_name, level));
if (result.size() == 1)
return result.back();
return std::make_shared<CompressionCodecMultiple>(result);
}
CompressionCodecPtr CompressionCodecFactory::get(const ASTPtr & ast) const
{
if (const auto * func = typeid_cast<const ASTFunction *>(ast.get()))
{
Codecs codecs;
codecs.reserve(func->arguments->children.size());
for (const auto & inner_codec_ast : func->arguments->children)
{
if (const auto * family_name = typeid_cast<const ASTIdentifier *>(inner_codec_ast.get()))
codecs.emplace_back(getImpl(family_name->name, {}));
else if (const auto * ast_func = typeid_cast<const ASTFunction *>(inner_codec_ast.get()))
codecs.emplace_back(getImpl(ast_func->name, ast_func->arguments));
else
throw Exception("Unexpected AST element for compression codec", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
}
if (codecs.size() == 1)
return codecs.back();
else if (codecs.size() > 1)
return std::make_shared<CompressionCodecMultiple>(codecs);
}
throw Exception("Unknown codec family: " + queryToString(ast), ErrorCodes::UNKNOWN_CODEC);
}
CompressionCodecPtr CompressionCodecFactory::get(const UInt8 byte_code) const
{
const auto family_code_and_creator = family_code_with_codec.find(byte_code);
if (family_code_and_creator == family_code_with_codec.end())
throw Exception("Unknown codec family code : " + toString(byte_code), ErrorCodes::UNKNOWN_CODEC);
return family_code_and_creator->second({});
}
CompressionCodecPtr CompressionCodecFactory::getImpl(const String & family_name, const ASTPtr & arguments) const
{
if (family_name == "Multiple")
throw Exception("Codec MULTIPLE cannot be specified directly", ErrorCodes::UNKNOWN_CODEC);
const auto family_and_creator = family_name_with_codec.find(family_name);
if (family_and_creator == family_name_with_codec.end())
throw Exception("Unknown codec family: " + family_name, ErrorCodes::UNKNOWN_CODEC);
return family_and_creator->second(arguments);
}
void CompressionCodecFactory::registerCompressionCodec(const String & family_name, std::optional<UInt8> byte_code, Creator creator)
{
if (creator == nullptr)
throw Exception("CompressionCodecFactory: the codec family " + family_name + " has been provided a null constructor",
ErrorCodes::LOGICAL_ERROR);
if (!family_name_with_codec.emplace(family_name, creator).second)
throw Exception("CompressionCodecFactory: the codec family name '" + family_name + "' is not unique", ErrorCodes::LOGICAL_ERROR);
if (byte_code)
if (!family_code_with_codec.emplace(*byte_code, creator).second)
throw Exception("CompressionCodecFactory: the codec family name '" + family_name + "' is not unique", ErrorCodes::LOGICAL_ERROR);
}
void CompressionCodecFactory::registerSimpleCompressionCodec(const String & family_name, std::optional<UInt8> byte_code,
std::function<CompressionCodecPtr()> creator)
{
registerCompressionCodec(family_name, byte_code, [family_name, creator](const ASTPtr & ast)
{
if (ast)
throw Exception("Compression codec " + family_name + " cannot have arguments", ErrorCodes::DATA_TYPE_CANNOT_HAVE_ARGUMENTS);
return creator();
});
}
void registerCodecLZ4(CompressionCodecFactory & factory);
void registerCodecNone(CompressionCodecFactory & factory);
void registerCodecZSTD(CompressionCodecFactory & factory);
void registerCodecMultiple(CompressionCodecFactory & factory);
void registerCodecLZ4HC(CompressionCodecFactory & factory);
//void registerCodecDelta(CompressionCodecFactory & factory);
CompressionCodecFactory::CompressionCodecFactory()
{
default_codec = std::make_shared<CompressionCodecLZ4>();
registerCodecLZ4(*this);
registerCodecNone(*this);
registerCodecZSTD(*this);
registerCodecMultiple(*this);
registerCodecLZ4HC(*this);
// registerCodecDelta(*this);
}
}

View File

@ -0,0 +1,68 @@
#pragma once
#include <memory>
#include <functional>
#include <optional>
#include <unordered_map>
#include <ext/singleton.h>
#include <Common/IFactoryWithAliases.h>
#include <Compression/ICompressionCodec.h>
#include <Compression/CompressionInfo.h>
namespace DB
{
class ICompressionCodec;
using CompressionCodecPtr = std::shared_ptr<ICompressionCodec>;
using CodecNameWithLevel = std::pair<String, std::optional<int>>;
class IAST;
using ASTPtr = std::shared_ptr<IAST>;
/** Creates a codec object by name of compression algorithm family and parameters.
*/
class CompressionCodecFactory final : public ext::singleton<CompressionCodecFactory>
{
protected:
using Creator = std::function<CompressionCodecPtr(const ASTPtr & parameters)>;
using SimpleCreator = std::function<CompressionCodecPtr()>;
using CompressionCodecsDictionary = std::unordered_map<String, Creator>;
using CompressionCodecsCodeDictionary = std::unordered_map<UInt8, Creator>;
public:
/// Return default codec (currently LZ4)
CompressionCodecPtr getDefaultCodec() const;
/// Get codec by AST
CompressionCodecPtr get(const ASTPtr & ast) const;
/// Get codec by method byte (no params available)
CompressionCodecPtr get(const UInt8 byte_code) const;
/// For backward compatibility with config settings
CompressionCodecPtr get(const String & family_name, std::optional<int> level) const;
CompressionCodecPtr get(const std::vector<CodecNameWithLevel> & codecs) const;
/// Register codec with parameters
void registerCompressionCodec(const String & family_name, std::optional<UInt8> byte_code, Creator creator);
/// Register codec without parameters
void registerSimpleCompressionCodec(const String & family_name, std::optional<UInt8> byte_code, SimpleCreator creator);
protected:
CompressionCodecPtr getImpl(const String & family_name, const ASTPtr & arguments) const;
private:
CompressionCodecsDictionary family_name_with_codec;
CompressionCodecsCodeDictionary family_code_with_codec;
CompressionCodecPtr default_codec;
CompressionCodecFactory();
friend class ext::singleton<CompressionCodecFactory>;
};
}

View File

@ -2,25 +2,16 @@
#include <cstdint>
/** Common Defines */
/** Common defines for compression */
#define DBMS_MAX_COMPRESSED_SIZE 0x40000000ULL /// 1GB
/** one byte for method, 4 bytes for compressed size, 4 bytes for uncompressed size */
#define COMPRESSED_BLOCK_HEADER_SIZE 9
namespace DB
{
/** Compression method */
enum class CompressionMethod
{
LZ4 = 1,
LZ4HC = 2, /// The format is the same as for LZ4. The difference is only in compression.
ZSTD = 3, /// Experimental algorithm: https://github.com/Cyan4973/zstd
NONE = 4, /// No compression
};
/** The compressed block format is as follows:
*
* The first 16 bytes are the checksum from all other bytes of the block. Now only CityHash128 is used.
@ -47,6 +38,7 @@ enum class CompressionMethodByte : uint8_t
NONE = 0x02,
LZ4 = 0x82,
ZSTD = 0x90,
Multiple = 0x91,
};
}

View File

@ -0,0 +1,74 @@
#include "ICompressionCodec.h"
#include <Compression/LZ4_decompress_faster.h>
#include <common/unaligned.h>
#include <Common/hex.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Common/typeid_cast.h>
#include <Compression/CompressionFactory.h>
#include <zstd.h>
namespace ProfileEvents
{
extern const Event ReadCompressedBytes;
extern const Event CompressedReadBufferBlocks;
extern const Event CompressedReadBufferBytes;
}
namespace DB
{
namespace ErrorCodes
{
extern const int CHECKSUM_DOESNT_MATCH;
extern const int TOO_LARGE_SIZE_COMPRESSED;
extern const int UNKNOWN_COMPRESSION_METHOD;
extern const int CANNOT_DECOMPRESS;
extern const int SEEK_POSITION_OUT_OF_BOUND;
}
UInt32 ICompressionCodec::compress(char * source, UInt32 source_size, char * dest) const
{
dest[0] = getMethodByte();
UInt8 header_size = getHeaderSize();
/// Write data from header_size
UInt32 compressed_bytes_written = doCompressData(source, source_size, &dest[header_size]);
unalignedStore<UInt32>(&dest[1], compressed_bytes_written + header_size);
unalignedStore<UInt32>(&dest[5], source_size);
return header_size + compressed_bytes_written;
}
UInt32 ICompressionCodec::decompress(char * source, UInt32 source_size, char * dest) const
{
UInt8 method = source[0];
if (method != getMethodByte())
throw Exception("Can't decompress data with codec byte " + toString(method) + " from codec with byte " + toString(method), ErrorCodes::CANNOT_DECOMPRESS);
UInt8 header_size = getHeaderSize();
UInt32 decompressed_size = unalignedLoad<UInt32>(&source[5]);
doDecompressData(&source[header_size], source_size - header_size, dest, decompressed_size);
return decompressed_size;
}
UInt32 ICompressionCodec::readCompressedBlockSize(const char * source)
{
return unalignedLoad<UInt32>(&source[1]);
}
UInt32 ICompressionCodec::readDecompressedBlockSize(const char * source)
{
return unalignedLoad<UInt32>(&source[5]);
}
UInt8 ICompressionCodec::readMethod(const char * source)
{
return static_cast<UInt8>(source[0]);
}
}

View File

@ -0,0 +1,73 @@
#pragma once
#include <memory>
#include <Core/Field.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <Common/PODArray.h>
#include <DataTypes/IDataType.h>
#include <boost/noncopyable.hpp>
#include <IO/UncompressedCache.h>
#include <Compression/LZ4_decompress_faster.h>
#include <Compression/CompressionInfo.h>
namespace DB
{
class ICompressionCodec;
using CompressionCodecPtr = std::shared_ptr<ICompressionCodec>;
using Codecs = std::vector<CompressionCodecPtr>;
/**
* Represents interface for compression codecs like LZ4, ZSTD, etc.
*/
class ICompressionCodec : private boost::noncopyable
{
public:
virtual ~ICompressionCodec() = default;
/// Byte which indicates codec in compressed file
virtual UInt8 getMethodByte() const = 0;
/// Codec description, for example "ZSTD(2)" or "LZ4,LZ4HC(5)"
virtual String getCodecDesc() const = 0;
/// Compressed bytes from uncompressed source to dest. Dest should preallocate memory
virtual UInt32 compress(char * source, UInt32 source_size, char * dest) const;
/// Decompress bytes from compressed source to dest. Dest should preallocate memory
virtual UInt32 decompress(char * source, UInt32 source_size, char * dest) const;
/// Number of bytes, that will be used to compress uncompressed_size bytes with current codec
virtual UInt32 getCompressedReserveSize(UInt32 uncompressed_size) const { return getHeaderSize() + getCompressedDataSize(uncompressed_size); }
/// Some codecs (LZ4, for example) require additional bytes at end of buffer
virtual UInt32 getAdditionalSizeAtTheEndOfBuffer() const { return 0; }
/// Size of header in compressed data on disk
static UInt8 getHeaderSize() { return COMPRESSED_BLOCK_HEADER_SIZE; }
/// Read size of compressed block from compressed source
static UInt32 readCompressedBlockSize(const char * source);
/// Read size of decompressed block from compressed source
static UInt32 readDecompressedBlockSize(const char * source);
/// Read method byte from compressed source
static UInt8 readMethod(const char * source);
protected:
/// Return size of compressed data without header
virtual UInt32 getCompressedDataSize(UInt32 uncompressed_size) const { return uncompressed_size; }
/// Actually compress data, without header
virtual UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const = 0;
/// Actually decompress data without header
virtual void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const = 0;
};
}

View File

@ -1,12 +1,11 @@
#include "LZ4_decompress_faster.h"
#include <string.h>
#include <iostream>
#include <random>
#include <algorithm>
#include <IO/LZ4_decompress_faster.h>
#include <Core/Defines.h>
#include <Common/Stopwatch.h>
#include <common/likely.h>
#include <common/Types.h>
#include <common/unaligned.h>

View File

@ -0,0 +1,5 @@
add_executable (compressed_buffer compressed_buffer.cpp)
target_link_libraries (compressed_buffer PRIVATE clickhouse_compression clickhouse_common_io)
add_executable (cached_compressed_read_buffer cached_compressed_read_buffer.cpp)
target_link_libraries (cached_compressed_read_buffer PRIVATE clickhouse_compression clickhouse_common_io)

View File

@ -2,7 +2,8 @@
#include <iomanip>
#include <limits>
#include <IO/CachedCompressedReadBuffer.h>
#include <Compression/CompressionFactory.h>
#include <Compression/CachedCompressedReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h>

View File

@ -9,8 +9,8 @@
#include <Common/Stopwatch.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/CompressedWriteBuffer.h>
#include <IO/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Compression/CompressedReadBuffer.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>

View File

@ -267,55 +267,55 @@ inline bool_if_safe_conversion<A, B> equalsOp(A a, B b)
}
template <>
inline bool equalsOp<DB::Float64, DB::UInt64>(DB::Float64 f, DB::UInt64 u)
inline bool NO_SANITIZE_UNDEFINED equalsOp<DB::Float64, DB::UInt64>(DB::Float64 f, DB::UInt64 u)
{
return static_cast<DB::UInt64>(f) == u && f == static_cast<DB::Float64>(u);
}
template <>
inline bool equalsOp<DB::UInt64, DB::Float64>(DB::UInt64 u, DB::Float64 f)
inline bool NO_SANITIZE_UNDEFINED equalsOp<DB::UInt64, DB::Float64>(DB::UInt64 u, DB::Float64 f)
{
return u == static_cast<DB::UInt64>(f) && static_cast<DB::Float64>(u) == f;
}
template <>
inline bool equalsOp<DB::Float64, DB::Int64>(DB::Float64 f, DB::Int64 u)
inline bool NO_SANITIZE_UNDEFINED equalsOp<DB::Float64, DB::Int64>(DB::Float64 f, DB::Int64 u)
{
return static_cast<DB::Int64>(f) == u && f == static_cast<DB::Float64>(u);
}
template <>
inline bool equalsOp<DB::Int64, DB::Float64>(DB::Int64 u, DB::Float64 f)
inline bool NO_SANITIZE_UNDEFINED equalsOp<DB::Int64, DB::Float64>(DB::Int64 u, DB::Float64 f)
{
return u == static_cast<DB::Int64>(f) && static_cast<DB::Float64>(u) == f;
}
template <>
inline bool equalsOp<DB::Float32, DB::UInt64>(DB::Float32 f, DB::UInt64 u)
inline bool NO_SANITIZE_UNDEFINED equalsOp<DB::Float32, DB::UInt64>(DB::Float32 f, DB::UInt64 u)
{
return static_cast<DB::UInt64>(f) == u && f == static_cast<DB::Float32>(u);
}
template <>
inline bool equalsOp<DB::UInt64, DB::Float32>(DB::UInt64 u, DB::Float32 f)
inline bool NO_SANITIZE_UNDEFINED equalsOp<DB::UInt64, DB::Float32>(DB::UInt64 u, DB::Float32 f)
{
return u == static_cast<DB::UInt64>(f) && static_cast<DB::Float32>(u) == f;
}
template <>
inline bool equalsOp<DB::Float32, DB::Int64>(DB::Float32 f, DB::Int64 u)
inline bool NO_SANITIZE_UNDEFINED equalsOp<DB::Float32, DB::Int64>(DB::Float32 f, DB::Int64 u)
{
return static_cast<DB::Int64>(f) == u && f == static_cast<DB::Float32>(u);
}
template <>
inline bool equalsOp<DB::Int64, DB::Float32>(DB::Int64 u, DB::Float32 f)
inline bool NO_SANITIZE_UNDEFINED equalsOp<DB::Int64, DB::Float32>(DB::Int64 u, DB::Float32 f)
{
return u == static_cast<DB::Int64>(f) && static_cast<DB::Float32>(u) == f;
}
template <>
inline bool equalsOp<DB::UInt128, DB::Float64>(DB::UInt128 u, DB::Float64 f)
inline bool NO_SANITIZE_UNDEFINED equalsOp<DB::UInt128, DB::Float64>(DB::UInt128 u, DB::Float64 f)
{
return u.low == 0 && equalsOp(static_cast<UInt64>(u.high), f);
}
@ -338,7 +338,7 @@ inline bool equalsOp<DB::Float32, DB::UInt128>(DB::Float32 f, DB::UInt128 u)
return equalsOp(static_cast<DB::Float64>(f), u);
}
inline bool greaterOp(DB::Int128 i, DB::Float64 f)
inline bool NO_SANITIZE_UNDEFINED greaterOp(DB::Int128 i, DB::Float64 f)
{
static constexpr __int128 min_int128 = __int128(0x8000000000000000ll) << 64;
static constexpr __int128 max_int128 = (__int128(0x7fffffffffffffffll) << 64) + 0xffffffffffffffffll;
@ -350,7 +350,7 @@ inline bool greaterOp(DB::Int128 i, DB::Float64 f)
|| (f < static_cast<DB::Float64>(max_int128) && i > static_cast<DB::Int128>(f));
}
inline bool greaterOp(DB::Float64 f, DB::Int128 i)
inline bool NO_SANITIZE_UNDEFINED greaterOp(DB::Float64 f, DB::Int128 i)
{
static constexpr __int128 min_int128 = __int128(0x8000000000000000ll) << 64;
static constexpr __int128 max_int128 = (__int128(0x7fffffffffffffffll) << 64) + 0xffffffffffffffffll;
@ -365,8 +365,8 @@ inline bool greaterOp(DB::Float64 f, DB::Int128 i)
inline bool greaterOp(DB::Int128 i, DB::Float32 f) { return greaterOp(i, static_cast<DB::Float64>(f)); }
inline bool greaterOp(DB::Float32 f, DB::Int128 i) { return greaterOp(static_cast<DB::Float64>(f), i); }
inline bool equalsOp(DB::Int128 i, DB::Float64 f) { return i == static_cast<DB::Int128>(f) && static_cast<DB::Float64>(i) == f; }
inline bool equalsOp(DB::Int128 i, DB::Float32 f) { return i == static_cast<DB::Int128>(f) && static_cast<DB::Float32>(i) == f; }
inline bool NO_SANITIZE_UNDEFINED equalsOp(DB::Int128 i, DB::Float64 f) { return i == static_cast<DB::Int128>(f) && static_cast<DB::Float64>(i) == f; }
inline bool NO_SANITIZE_UNDEFINED equalsOp(DB::Int128 i, DB::Float32 f) { return i == static_cast<DB::Int128>(f) && static_cast<DB::Float32>(i) == f; }
inline bool equalsOp(DB::Float64 f, DB::Int128 i) { return equalsOp(i, f); }
inline bool equalsOp(DB::Float32 f, DB::Int128 i) { return equalsOp(i, f); }

View File

@ -104,3 +104,13 @@
#elif defined(__SANITIZE_THREAD__)
#define THREAD_SANITIZER 1
#endif
/// Explicitly allow undefined behaviour for certain functions. Use it as a function attribute.
/// It is useful in case when compiler cannot see (and exploit) it, but UBSan can.
/// Example: multiplication of signed integers with possibility of overflow when both sides are from user input.
#if defined(__clang__)
#define NO_SANITIZE_UNDEFINED __attribute__((__no_sanitize__("undefined")))
#else
/// It does not work in GCC. GCC 7 cannot recognize this attribute and GCC 8 simply ignores it.
#define NO_SANITIZE_UNDEFINED
#endif

View File

@ -2,7 +2,7 @@
#include <Interpreters/Aggregator.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/CompressedReadBuffer.h>
#include <Compression/CompressedReadBuffer.h>
#include <DataStreams/IProfilingBlockInputStream.h>

View File

@ -1,7 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Common/Arena.h>
#include <Interpreters/Aggregator.h>
#include <Core/ColumnNumbers.h>

View File

@ -5,7 +5,7 @@
#include <DataStreams/processConstants.h>
#include <Common/formatReadable.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/CompressedWriteBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Interpreters/sortBlock.h>

View File

@ -12,7 +12,7 @@
#include <DataStreams/NativeBlockInputStream.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/CompressedReadBuffer.h>
#include <Compression/CompressedReadBuffer.h>
namespace DB

View File

@ -1,5 +1,6 @@
#include <queue>
#include <iomanip>
#include <sstream>
#include <DataStreams/MergingSortedBlockInputStream.h>
@ -291,11 +292,18 @@ void MergingSortedBlockInputStream::readSuffixImpl()
const BlockStreamProfileInfo & profile_info = getProfileInfo();
double seconds = profile_info.total_stopwatch.elapsedSeconds();
LOG_DEBUG(log, std::fixed << std::setprecision(2)
std::stringstream message;
message << std::fixed << std::setprecision(2)
<< "Merge sorted " << profile_info.blocks << " blocks, " << profile_info.rows << " rows"
<< " in " << seconds << " sec., "
<< " in " << seconds << " sec.";
if (seconds)
message << ", "
<< profile_info.rows / seconds << " rows/sec., "
<< profile_info.bytes / 1000000.0 / seconds << " MB/sec.");
<< profile_info.bytes / 1000000.0 / seconds << " MB/sec.";
LOG_DEBUG(log, message.str());
}
}

Some files were not shown because too many files have changed in this diff Show More