diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index e4c7887f700..a89e7c4df41 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -64,7 +64,6 @@ set(dbms_sources) include(../cmake/dbms_glob_sources.cmake) add_headers_and_sources(clickhouse_common_io src/Common) -add_headers_and_sources(clickhouse_common_io src/Compression) add_headers_and_sources(clickhouse_common_io src/Common/HashTable) add_headers_and_sources(clickhouse_common_io src/IO) @@ -159,7 +158,6 @@ target_link_libraries (clickhouse_common_io PUBLIC common PRIVATE - clickhouse_parsers string_utils widechar_width ${LINK_LIBRARIES_ONLY_ON_X86_64} @@ -186,6 +184,8 @@ target_link_libraries (clickhouse_common_io ) target_link_libraries (dbms + PUBLIC + clickhouse_compression PRIVATE clickhouse_parsers clickhouse_common_config diff --git a/dbms/programs/compressor/CMakeLists.txt b/dbms/programs/compressor/CMakeLists.txt index 7aa2cad5708..bf3accfb8af 100644 --- a/dbms/programs/compressor/CMakeLists.txt +++ b/dbms/programs/compressor/CMakeLists.txt @@ -1,5 +1,5 @@ add_library (clickhouse-compressor-lib ${LINK_MODE} Compressor.cpp) -target_link_libraries (clickhouse-compressor-lib PRIVATE clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY}) +target_link_libraries (clickhouse-compressor-lib PRIVATE clickhouse_compression clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY}) if (CLICKHOUSE_SPLIT_BINARY) # Also in utils diff --git a/dbms/src/Common/tests/CMakeLists.txt b/dbms/src/Common/tests/CMakeLists.txt index 05984c9d42e..5e22d7e37cd 100644 --- a/dbms/src/Common/tests/CMakeLists.txt +++ b/dbms/src/Common/tests/CMakeLists.txt @@ -20,10 +20,10 @@ add_executable (small_table small_table.cpp) target_link_libraries (small_table PRIVATE clickhouse_common_io) add_executable (parallel_aggregation parallel_aggregation.cpp) -target_link_libraries (parallel_aggregation PRIVATE clickhouse_common_io) +target_link_libraries (parallel_aggregation PRIVATE clickhouse_compression clickhouse_common_io) add_executable (parallel_aggregation2 parallel_aggregation2.cpp) -target_link_libraries (parallel_aggregation2 PRIVATE clickhouse_common_io) +target_link_libraries (parallel_aggregation2 PRIVATE clickhouse_compression clickhouse_common_io) add_executable (int_hashes_perf int_hashes_perf.cpp AvalancheTest.cpp Random.cpp) target_link_libraries (int_hashes_perf PRIVATE clickhouse_common_io) @@ -42,7 +42,7 @@ add_executable (shell_command_test shell_command_test.cpp) target_link_libraries (shell_command_test PRIVATE clickhouse_common_io) add_executable (arena_with_free_lists arena_with_free_lists.cpp) -target_link_libraries (arena_with_free_lists PRIVATE clickhouse_common_io) +target_link_libraries (arena_with_free_lists PRIVATE clickhouse_compression clickhouse_common_io) add_executable (pod_array pod_array.cpp) target_link_libraries (pod_array PRIVATE clickhouse_common_io) @@ -61,7 +61,7 @@ target_link_libraries (space_saving PRIVATE clickhouse_common_io) add_executable (integer_hash_tables_and_hashes integer_hash_tables_and_hashes.cpp) target_include_directories (integer_hash_tables_and_hashes SYSTEM BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR}) -target_link_libraries (integer_hash_tables_and_hashes PRIVATE clickhouse_common_io) +target_link_libraries (integer_hash_tables_and_hashes PRIVATE clickhouse_compression clickhouse_common_io) add_executable (allocator allocator.cpp) target_link_libraries (allocator PRIVATE clickhouse_common_io) diff --git a/dbms/src/Compression/CMakeLists.txt b/dbms/src/Compression/CMakeLists.txt index e69de29bb2d..6a0d6c90622 100644 --- a/dbms/src/Compression/CMakeLists.txt +++ b/dbms/src/Compression/CMakeLists.txt @@ -0,0 +1,9 @@ +include(${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake) +add_headers_and_sources(clickhouse_compression .) +add_library(clickhouse_compression ${LINK_MODE} ${clickhouse_compression_headers} ${clickhouse_compression_sources}) +target_link_libraries(clickhouse_compression PRIVATE clickhouse_parsers clickhouse_common_io ${ZSTD_LIBRARY}) +target_include_directories(clickhouse_compression PUBLIC ${DBMS_INCLUDE_DIR}) + +#if(ENABLE_TESTS) +# add_subdirectory(tests) +#endif() diff --git a/dbms/src/IO/CachedCompressedReadBuffer.cpp b/dbms/src/Compression/CachedCompressedReadBuffer.cpp similarity index 98% rename from dbms/src/IO/CachedCompressedReadBuffer.cpp rename to dbms/src/Compression/CachedCompressedReadBuffer.cpp index 50c97edf1a3..bfaf9695f9b 100644 --- a/dbms/src/IO/CachedCompressedReadBuffer.cpp +++ b/dbms/src/Compression/CachedCompressedReadBuffer.cpp @@ -1,9 +1,10 @@ +#include "CachedCompressedReadBuffer.h" + #include -#include +#include "CachedCompressedReadBuffer.h" #include #include #include -#include "CachedCompressedReadBuffer.h" namespace DB diff --git a/dbms/src/Compression/CachedCompressedReadBuffer.h b/dbms/src/Compression/CachedCompressedReadBuffer.h new file mode 100644 index 00000000000..174ddb98587 --- /dev/null +++ b/dbms/src/Compression/CachedCompressedReadBuffer.h @@ -0,0 +1,58 @@ +#pragma once + +#include +#include +#include +#include "CompressedReadBufferBase.h" +#include +#include + + +namespace DB +{ + + +/** A buffer for reading from a compressed file using the cache of decompressed blocks. + * The external cache is passed as an argument to the constructor. + * Allows you to increase performance in cases where the same blocks are often read. + * Disadvantages: + * - in case you need to read a lot of data in a row, but of them only a part is cached, you have to do seek-and. + */ +class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadBuffer +{ +private: + const std::string path; + UncompressedCache * cache; + size_t buf_size; + size_t estimated_size; + size_t aio_threshold; + + std::unique_ptr file_in; + size_t file_pos; + + /// A piece of data from the cache, or a piece of read data that we put into the cache. + UncompressedCache::MappedPtr owned_cell; + + void initInput(); + bool nextImpl() override; + + /// Passed into file_in. + ReadBufferFromFileBase::ProfileCallback profile_callback; + clockid_t clock_type {}; + +public: + CachedCompressedReadBuffer( + const std::string & path_, UncompressedCache * cache_, size_t estimated_size_, size_t aio_threshold_, + size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE); + + + void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block); + + void setProfileCallback(const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE) + { + profile_callback = profile_callback_; + clock_type = clock_type_; + } +}; + +} diff --git a/dbms/src/IO/CompressedReadBuffer.cpp b/dbms/src/Compression/CompressedReadBuffer.cpp similarity index 98% rename from dbms/src/IO/CompressedReadBuffer.cpp rename to dbms/src/Compression/CompressedReadBuffer.cpp index cc540161c92..699a44455fc 100644 --- a/dbms/src/IO/CompressedReadBuffer.cpp +++ b/dbms/src/Compression/CompressedReadBuffer.cpp @@ -1,4 +1,4 @@ -#include +#include "CompressedReadBuffer.h" #include #include diff --git a/dbms/src/Compression/CompressedReadBuffer.h b/dbms/src/Compression/CompressedReadBuffer.h new file mode 100644 index 00000000000..1e8ea4784c7 --- /dev/null +++ b/dbms/src/Compression/CompressedReadBuffer.h @@ -0,0 +1,33 @@ +#pragma once + +#include "CompressedReadBufferBase.h" +#include +#include + + +namespace DB +{ + +class CompressedReadBuffer : public CompressedReadBufferBase, public BufferWithOwnMemory +{ +private: + size_t size_compressed = 0; + + bool nextImpl() override; + +public: + CompressedReadBuffer(ReadBuffer & in_) + : CompressedReadBufferBase(&in_), BufferWithOwnMemory(0) + { + } + + size_t readBig(char * to, size_t n) override; + + /// The compressed size of the current block. + size_t getSizeCompressed() const + { + return size_compressed; + } +}; + +} diff --git a/dbms/src/IO/CompressedReadBufferBase.cpp b/dbms/src/Compression/CompressedReadBufferBase.cpp similarity index 99% rename from dbms/src/IO/CompressedReadBufferBase.cpp rename to dbms/src/Compression/CompressedReadBufferBase.cpp index 5ac795a82e1..1ce83134f03 100644 --- a/dbms/src/IO/CompressedReadBufferBase.cpp +++ b/dbms/src/Compression/CompressedReadBufferBase.cpp @@ -1,4 +1,4 @@ -#include +#include "CompressedReadBufferBase.h" #include diff --git a/dbms/src/IO/CompressedReadBufferBase.h b/dbms/src/Compression/CompressedReadBufferBase.h similarity index 100% rename from dbms/src/IO/CompressedReadBufferBase.h rename to dbms/src/Compression/CompressedReadBufferBase.h diff --git a/dbms/src/IO/CompressedReadBufferFromFile.cpp b/dbms/src/Compression/CompressedReadBufferFromFile.cpp similarity index 98% rename from dbms/src/IO/CompressedReadBufferFromFile.cpp rename to dbms/src/Compression/CompressedReadBufferFromFile.cpp index 25008c205b5..75103435c13 100644 --- a/dbms/src/IO/CompressedReadBufferFromFile.cpp +++ b/dbms/src/Compression/CompressedReadBufferFromFile.cpp @@ -1,4 +1,5 @@ -#include +#include "CompressedReadBufferFromFile.h" + #include #include #include diff --git a/dbms/src/Compression/CompressedReadBufferFromFile.h b/dbms/src/Compression/CompressedReadBufferFromFile.h new file mode 100644 index 00000000000..288a66e321a --- /dev/null +++ b/dbms/src/Compression/CompressedReadBufferFromFile.h @@ -0,0 +1,45 @@ +#pragma once + +#include "CompressedReadBufferBase.h" +#include +#include +#include +#include + + +namespace DB +{ + + +/// Unlike CompressedReadBuffer, it can do seek. +class CompressedReadBufferFromFile : public CompressedReadBufferBase, public BufferWithOwnMemory +{ +private: + /** At any time, one of two things is true: + * a) size_compressed = 0 + * b) + * - `working_buffer` contains the entire block. + * - `file_in` points to the end of this block. + * - `size_compressed` contains the compressed size of this block. + */ + std::unique_ptr p_file_in; + ReadBufferFromFileBase & file_in; + size_t size_compressed = 0; + + bool nextImpl() override; + +public: + CompressedReadBufferFromFile( + const std::string & path, size_t estimated_size, size_t aio_threshold, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE); + + void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block); + + size_t readBig(char * to, size_t n) override; + + void setProfileCallback(const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE) + { + file_in.setProfileCallback(profile_callback_, clock_type_); + } +}; + +} diff --git a/dbms/src/IO/CompressedWriteBuffer.cpp b/dbms/src/Compression/CompressedWriteBuffer.cpp similarity index 97% rename from dbms/src/IO/CompressedWriteBuffer.cpp rename to dbms/src/Compression/CompressedWriteBuffer.cpp index 24a2021555f..7fc8d5ab5f9 100644 --- a/dbms/src/IO/CompressedWriteBuffer.cpp +++ b/dbms/src/Compression/CompressedWriteBuffer.cpp @@ -8,7 +8,7 @@ #include #include -#include +#include "CompressedWriteBuffer.h" #include diff --git a/dbms/src/Compression/CompressedWriteBuffer.h b/dbms/src/Compression/CompressedWriteBuffer.h new file mode 100644 index 00000000000..a9612b463a5 --- /dev/null +++ b/dbms/src/Compression/CompressedWriteBuffer.h @@ -0,0 +1,55 @@ +#pragma once + +#include + +#include + +#include +#include +#include +#include + + +namespace DB +{ + +class CompressedWriteBuffer : public BufferWithOwnMemory +{ +private: + WriteBuffer & out; + CompressionCodecPtr codec; + + PODArray compressed_buffer; + + void nextImpl() override; + +public: + CompressedWriteBuffer( + WriteBuffer & out_, + CompressionCodecPtr codec_ = CompressionCodecFactory::instance().getDefaultCodec(), + size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE); + + /// The amount of compressed data + size_t getCompressedBytes() + { + nextIfAtEnd(); + return out.count(); + } + + /// How many uncompressed bytes were written to the buffer + size_t getUncompressedBytes() + { + return count(); + } + + /// How many bytes are in the buffer (not yet compressed) + size_t getRemainingBytes() + { + nextIfAtEnd(); + return offset(); + } + + ~CompressedWriteBuffer() override; +}; + +} diff --git a/dbms/src/IO/CompressionCodecWriteBuffer.h b/dbms/src/Compression/CompressionCodecWriteBuffer.h similarity index 100% rename from dbms/src/IO/CompressionCodecWriteBuffer.h rename to dbms/src/Compression/CompressionCodecWriteBuffer.h diff --git a/dbms/src/IO/CachedCompressedReadBuffer.h b/dbms/src/IO/CachedCompressedReadBuffer.h index 1b5e41972f3..936c1520407 100644 --- a/dbms/src/IO/CachedCompressedReadBuffer.h +++ b/dbms/src/IO/CachedCompressedReadBuffer.h @@ -1,58 +1 @@ -#pragma once - -#include -#include -#include -#include -#include -#include - - -namespace DB -{ - - -/** A buffer for reading from a compressed file using the cache of decompressed blocks. - * The external cache is passed as an argument to the constructor. - * Allows you to increase performance in cases where the same blocks are often read. - * Disadvantages: - * - in case you need to read a lot of data in a row, but of them only a part is cached, you have to do seek-and. - */ -class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadBuffer -{ -private: - const std::string path; - UncompressedCache * cache; - size_t buf_size; - size_t estimated_size; - size_t aio_threshold; - - std::unique_ptr file_in; - size_t file_pos; - - /// A piece of data from the cache, or a piece of read data that we put into the cache. - UncompressedCache::MappedPtr owned_cell; - - void initInput(); - bool nextImpl() override; - - /// Passed into file_in. - ReadBufferFromFileBase::ProfileCallback profile_callback; - clockid_t clock_type {}; - -public: - CachedCompressedReadBuffer( - const std::string & path_, UncompressedCache * cache_, size_t estimated_size_, size_t aio_threshold_, - size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE); - - - void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block); - - void setProfileCallback(const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE) - { - profile_callback = profile_callback_; - clock_type = clock_type_; - } -}; - -} +#include diff --git a/dbms/src/IO/CompressedReadBuffer.h b/dbms/src/IO/CompressedReadBuffer.h index 60ba29012b8..88ad9a2fad3 100644 --- a/dbms/src/IO/CompressedReadBuffer.h +++ b/dbms/src/IO/CompressedReadBuffer.h @@ -1,33 +1 @@ -#pragma once - -#include -#include -#include - - -namespace DB -{ - -class CompressedReadBuffer : public CompressedReadBufferBase, public BufferWithOwnMemory -{ -private: - size_t size_compressed = 0; - - bool nextImpl() override; - -public: - CompressedReadBuffer(ReadBuffer & in_) - : CompressedReadBufferBase(&in_), BufferWithOwnMemory(0) - { - } - - size_t readBig(char * to, size_t n) override; - - /// The compressed size of the current block. - size_t getSizeCompressed() const - { - return size_compressed; - } -}; - -} +#include diff --git a/dbms/src/IO/CompressedReadBufferFromFile.h b/dbms/src/IO/CompressedReadBufferFromFile.h index f1332ea4187..4a659366aaa 100644 --- a/dbms/src/IO/CompressedReadBufferFromFile.h +++ b/dbms/src/IO/CompressedReadBufferFromFile.h @@ -1,45 +1 @@ -#pragma once - -#include -#include -#include -#include -#include - - -namespace DB -{ - - -/// Unlike CompressedReadBuffer, it can do seek. -class CompressedReadBufferFromFile : public CompressedReadBufferBase, public BufferWithOwnMemory -{ -private: - /** At any time, one of two things is true: - * a) size_compressed = 0 - * b) - * - `working_buffer` contains the entire block. - * - `file_in` points to the end of this block. - * - `size_compressed` contains the compressed size of this block. - */ - std::unique_ptr p_file_in; - ReadBufferFromFileBase & file_in; - size_t size_compressed = 0; - - bool nextImpl() override; - -public: - CompressedReadBufferFromFile( - const std::string & path, size_t estimated_size, size_t aio_threshold, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE); - - void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block); - - size_t readBig(char * to, size_t n) override; - - void setProfileCallback(const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE) - { - file_in.setProfileCallback(profile_callback_, clock_type_); - } -}; - -} +#include diff --git a/dbms/src/IO/CompressedWriteBuffer.h b/dbms/src/IO/CompressedWriteBuffer.h index a9612b463a5..c1dfa3f54b3 100644 --- a/dbms/src/IO/CompressedWriteBuffer.h +++ b/dbms/src/IO/CompressedWriteBuffer.h @@ -1,55 +1 @@ -#pragma once - -#include - -#include - -#include -#include -#include -#include - - -namespace DB -{ - -class CompressedWriteBuffer : public BufferWithOwnMemory -{ -private: - WriteBuffer & out; - CompressionCodecPtr codec; - - PODArray compressed_buffer; - - void nextImpl() override; - -public: - CompressedWriteBuffer( - WriteBuffer & out_, - CompressionCodecPtr codec_ = CompressionCodecFactory::instance().getDefaultCodec(), - size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE); - - /// The amount of compressed data - size_t getCompressedBytes() - { - nextIfAtEnd(); - return out.count(); - } - - /// How many uncompressed bytes were written to the buffer - size_t getUncompressedBytes() - { - return count(); - } - - /// How many bytes are in the buffer (not yet compressed) - size_t getRemainingBytes() - { - nextIfAtEnd(); - return offset(); - } - - ~CompressedWriteBuffer() override; -}; - -} +#include diff --git a/dbms/src/IO/tests/CMakeLists.txt b/dbms/src/IO/tests/CMakeLists.txt index 59a1e0088c2..79b30266281 100644 --- a/dbms/src/IO/tests/CMakeLists.txt +++ b/dbms/src/IO/tests/CMakeLists.txt @@ -19,8 +19,9 @@ target_link_libraries (valid_utf8_perf PRIVATE clickhouse_common_io) add_executable (valid_utf8 valid_utf8.cpp) target_link_libraries (valid_utf8 PRIVATE clickhouse_common_io) +# TODO move to Compression add_executable (compressed_buffer compressed_buffer.cpp) -target_link_libraries (compressed_buffer PRIVATE clickhouse_common_io) +target_link_libraries (compressed_buffer PRIVATE clickhouse_compression clickhouse_common_io) add_executable (var_uint var_uint.cpp) target_link_libraries (var_uint PRIVATE clickhouse_common_io) @@ -29,7 +30,7 @@ add_executable (read_escaped_string read_escaped_string.cpp) target_link_libraries (read_escaped_string PRIVATE clickhouse_common_io) add_executable (async_write async_write.cpp) -target_link_libraries (async_write PRIVATE clickhouse_common_io) +target_link_libraries (async_write PRIVATE clickhouse_compression clickhouse_common_io) add_executable (parse_int_perf parse_int_perf.cpp) target_link_libraries (parse_int_perf PRIVATE clickhouse_common_io) diff --git a/utils/check-marks/CMakeLists.txt b/utils/check-marks/CMakeLists.txt index 9c534364691..86cff8fb233 100644 --- a/utils/check-marks/CMakeLists.txt +++ b/utils/check-marks/CMakeLists.txt @@ -1,2 +1,2 @@ add_executable (check-marks main.cpp) -target_link_libraries(check-marks PRIVATE clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY}) +target_link_libraries(check-marks PRIVATE clickhouse_compression clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY})