diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index 890a8491fa6..5531ea14124 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -18,12 +19,9 @@ #include -#include - #include #include -#include #define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin" @@ -86,8 +84,8 @@ private: struct Stream { - Stream(const String & data_path, size_t offset, size_t max_read_buffer_size_) - : plain(data_path, std::min(static_cast(max_read_buffer_size_), Poco::File(data_path).getSize())), + Stream(const DiskPtr & disk, const String & data_path, size_t offset, size_t max_read_buffer_size_) + : plain(fullPath(disk, data_path), std::min(max_read_buffer_size_, disk->getFileSize(data_path))), compressed(plain) { if (offset) @@ -142,11 +140,11 @@ private: struct Stream { - Stream(const String & data_path, CompressionCodecPtr codec, size_t max_compress_block_size) : - plain(data_path, max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY), - compressed(plain, std::move(codec), max_compress_block_size) + Stream(const DiskPtr & disk, const String & data_path, CompressionCodecPtr codec, size_t max_compress_block_size) : + plain(fullPath(disk, data_path), max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY), + compressed(plain, std::move(codec), max_compress_block_size), + plain_offset(disk->getFileSize(data_path)) { - plain_offset = Poco::File(data_path).getSize(); } WriteBufferFromFile plain; @@ -251,7 +249,7 @@ void LogBlockInputStream::readData(const String & name, const IDataType & type, offset = file_it->second.marks[mark_number].offset; auto & data_file_path = file_it->second.data_file; - auto it = streams.try_emplace(stream_name, data_file_path, offset, max_read_buffer_size).first; + auto it = streams.try_emplace(stream_name, storage.disk, data_file_path, offset, max_read_buffer_size).first; return &it->second.compressed; }; }; @@ -341,8 +339,7 @@ IDataType::OutputStreamGetter LogBlockOutputStream::createStreamGetter(const Str void LogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column, - MarksForColumns & out_marks, - WrittenStreams & written_streams) + MarksForColumns & out_marks, WrittenStreams & written_streams) { IDataType::SerializeBinaryBulkSettings settings; @@ -355,6 +352,7 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type const auto & columns = storage.getColumns(); streams.try_emplace( stream_name, + storage.disk, storage.files[stream_name].data_file, columns.getCodecOrDefault(name), storage.max_compress_block_size); @@ -421,7 +419,7 @@ StorageLog::StorageLog( const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, size_t max_compress_block_size_) - : disk(std::move(disk_)), database_name(database_name_), table_name(table_name_), + : disk(std::move(disk_)), database_name(database_name_), table_name(table_name_), table_path("data/" + escapeForFileName(database_name_) + '/' + escapeForFileName(table_name_) + '/'), max_compress_block_size(max_compress_block_size_), file_checker(disk, table_path + "sizes.json") diff --git a/dbms/src/Storages/StorageStripeLog.cpp b/dbms/src/Storages/StorageStripeLog.cpp index c44d6eac9ab..92b76afd182 100644 --- a/dbms/src/Storages/StorageStripeLog.cpp +++ b/dbms/src/Storages/StorageStripeLog.cpp @@ -6,11 +6,9 @@ #include #include - #include -#include -#include +#include #include #include #include @@ -30,8 +28,6 @@ #include #include -#include -#include namespace DB @@ -91,7 +87,6 @@ protected: { block_in.reset(); data_in.reset(); - data_in_compressed.reset(); index.reset(); } } @@ -113,8 +108,7 @@ private: * - to save RAM when using a large number of sources. */ bool started = false; - std::unique_ptr data_in_compressed; - std::optional data_in; + std::optional data_in; std::optional block_in; void start() @@ -126,8 +120,7 @@ private: String data_file = storage.table_path + "data.bin"; size_t buffer_size = std::min(max_read_buffer_size, storage.disk->getFileSize(data_file)); - data_in_compressed = storage.disk->read(data_file, buffer_size); - data_in.emplace(*data_in_compressed); + data_in.emplace(fullPath(storage.disk, data_file), 0, 0, buffer_size); block_in.emplace(*data_in, 0, index_begin, index_end); } } @@ -259,8 +252,7 @@ BlockInputStreams StorageStripeLog::read( if (!disk->exists(index_file)) return { std::make_shared(getSampleBlockForColumns(column_names)) }; - std::unique_ptr index_in_compressed = disk->read(index_file, INDEX_BUFFER_SIZE); - CompressedReadBuffer index_in(*index_in_compressed); + CompressedReadBufferFromFile index_in(fullPath(disk, index_file), 0, 0, INDEX_BUFFER_SIZE); std::shared_ptr index{std::make_shared(index_in, column_names_set)}; diff --git a/dbms/src/Storages/StorageTinyLog.cpp b/dbms/src/Storages/StorageTinyLog.cpp index 56aab9163c1..20084b74d61 100644 --- a/dbms/src/Storages/StorageTinyLog.cpp +++ b/dbms/src/Storages/StorageTinyLog.cpp @@ -8,11 +8,10 @@ #include #include - #include +#include -#include -#include +#include #include #include #include @@ -25,9 +24,6 @@ #include -#include -#include - #include #include @@ -84,8 +80,8 @@ private: struct Stream { - Stream(const DiskPtr & disk, const String & data_path, size_t max_read_buffer_size) - : plain(disk->read(data_path, std::min(max_read_buffer_size, disk->getFileSize(data_path)))), + Stream(const DiskPtr & disk, const String & data_path, size_t max_read_buffer_size_) + : plain(disk->read(data_path, std::min(max_read_buffer_size_, disk->getFileSize(data_path)))), compressed(*plain) { }