Fixed CI errors

This commit is contained in:
Alexander Burmak 2019-12-23 19:57:16 +03:00
parent 0894ed9f33
commit 97f03ab064
3 changed files with 19 additions and 33 deletions

View File

@ -3,6 +3,7 @@
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/StringUtils/StringUtils.h> #include <Common/StringUtils/StringUtils.h>
#include <Common/typeid_cast.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h> #include <IO/WriteBufferFromFile.h>
@ -18,12 +19,9 @@
#include <Columns/ColumnArray.h> #include <Columns/ColumnArray.h>
#include <Common/typeid_cast.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Poco/Path.h> #include <Poco/Path.h>
#include <Poco/DirectoryIterator.h>
#define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin" #define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin"
@ -86,8 +84,8 @@ private:
struct Stream struct Stream
{ {
Stream(const String & data_path, size_t offset, size_t max_read_buffer_size_) Stream(const DiskPtr & disk, const String & data_path, size_t offset, size_t max_read_buffer_size_)
: plain(data_path, std::min(static_cast<Poco::File::FileSize>(max_read_buffer_size_), Poco::File(data_path).getSize())), : plain(fullPath(disk, data_path), std::min(max_read_buffer_size_, disk->getFileSize(data_path))),
compressed(plain) compressed(plain)
{ {
if (offset) if (offset)
@ -142,11 +140,11 @@ private:
struct Stream struct Stream
{ {
Stream(const String & data_path, CompressionCodecPtr codec, size_t max_compress_block_size) : Stream(const DiskPtr & disk, const String & data_path, CompressionCodecPtr codec, size_t max_compress_block_size) :
plain(data_path, max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY), plain(fullPath(disk, data_path), max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY),
compressed(plain, std::move(codec), max_compress_block_size) compressed(plain, std::move(codec), max_compress_block_size),
plain_offset(disk->getFileSize(data_path))
{ {
plain_offset = Poco::File(data_path).getSize();
} }
WriteBufferFromFile plain; WriteBufferFromFile plain;
@ -251,7 +249,7 @@ void LogBlockInputStream::readData(const String & name, const IDataType & type,
offset = file_it->second.marks[mark_number].offset; offset = file_it->second.marks[mark_number].offset;
auto & data_file_path = file_it->second.data_file; auto & data_file_path = file_it->second.data_file;
auto it = streams.try_emplace(stream_name, data_file_path, offset, max_read_buffer_size).first; auto it = streams.try_emplace(stream_name, storage.disk, data_file_path, offset, max_read_buffer_size).first;
return &it->second.compressed; return &it->second.compressed;
}; };
}; };
@ -341,8 +339,7 @@ IDataType::OutputStreamGetter LogBlockOutputStream::createStreamGetter(const Str
void LogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column, void LogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column,
MarksForColumns & out_marks, MarksForColumns & out_marks, WrittenStreams & written_streams)
WrittenStreams & written_streams)
{ {
IDataType::SerializeBinaryBulkSettings settings; IDataType::SerializeBinaryBulkSettings settings;
@ -355,6 +352,7 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
const auto & columns = storage.getColumns(); const auto & columns = storage.getColumns();
streams.try_emplace( streams.try_emplace(
stream_name, stream_name,
storage.disk,
storage.files[stream_name].data_file, storage.files[stream_name].data_file,
columns.getCodecOrDefault(name), columns.getCodecOrDefault(name),
storage.max_compress_block_size); storage.max_compress_block_size);
@ -421,7 +419,7 @@ StorageLog::StorageLog(
const ColumnsDescription & columns_, const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_, const ConstraintsDescription & constraints_,
size_t max_compress_block_size_) size_t max_compress_block_size_)
: disk(std::move(disk_)), database_name(database_name_), table_name(table_name_), : disk(std::move(disk_)), database_name(database_name_), table_name(table_name_),
table_path("data/" + escapeForFileName(database_name_) + '/' + escapeForFileName(table_name_) + '/'), table_path("data/" + escapeForFileName(database_name_) + '/' + escapeForFileName(table_name_) + '/'),
max_compress_block_size(max_compress_block_size_), max_compress_block_size(max_compress_block_size_),
file_checker(disk, table_path + "sizes.json") file_checker(disk, table_path + "sizes.json")

View File

@ -6,11 +6,9 @@
#include <optional> #include <optional>
#include <Common/escapeForFileName.h> #include <Common/escapeForFileName.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <IO/ReadBufferFromFile.h> #include <Compression/CompressedReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedReadBufferFromFile.h> #include <Compression/CompressedReadBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h> #include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
@ -30,8 +28,6 @@
#include <Storages/StorageStripeLog.h> #include <Storages/StorageStripeLog.h>
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>
#include <Poco/DirectoryIterator.h>
#include <Compression/CompressedReadBuffer.h>
namespace DB namespace DB
@ -91,7 +87,6 @@ protected:
{ {
block_in.reset(); block_in.reset();
data_in.reset(); data_in.reset();
data_in_compressed.reset();
index.reset(); index.reset();
} }
} }
@ -113,8 +108,7 @@ private:
* - to save RAM when using a large number of sources. * - to save RAM when using a large number of sources.
*/ */
bool started = false; bool started = false;
std::unique_ptr<ReadBuffer> data_in_compressed; std::optional<CompressedReadBufferFromFile> data_in;
std::optional<CompressedReadBuffer> data_in;
std::optional<NativeBlockInputStream> block_in; std::optional<NativeBlockInputStream> block_in;
void start() void start()
@ -126,8 +120,7 @@ private:
String data_file = storage.table_path + "data.bin"; String data_file = storage.table_path + "data.bin";
size_t buffer_size = std::min(max_read_buffer_size, storage.disk->getFileSize(data_file)); size_t buffer_size = std::min(max_read_buffer_size, storage.disk->getFileSize(data_file));
data_in_compressed = storage.disk->read(data_file, buffer_size); data_in.emplace(fullPath(storage.disk, data_file), 0, 0, buffer_size);
data_in.emplace(*data_in_compressed);
block_in.emplace(*data_in, 0, index_begin, index_end); block_in.emplace(*data_in, 0, index_begin, index_end);
} }
} }
@ -259,8 +252,7 @@ BlockInputStreams StorageStripeLog::read(
if (!disk->exists(index_file)) if (!disk->exists(index_file))
return { std::make_shared<NullBlockInputStream>(getSampleBlockForColumns(column_names)) }; return { std::make_shared<NullBlockInputStream>(getSampleBlockForColumns(column_names)) };
std::unique_ptr<ReadBuffer> index_in_compressed = disk->read(index_file, INDEX_BUFFER_SIZE); CompressedReadBufferFromFile index_in(fullPath(disk, index_file), 0, 0, INDEX_BUFFER_SIZE);
CompressedReadBuffer index_in(*index_in_compressed);
std::shared_ptr<const IndexForNativeFormat> index{std::make_shared<IndexForNativeFormat>(index_in, column_names_set)}; std::shared_ptr<const IndexForNativeFormat> index{std::make_shared<IndexForNativeFormat>(index_in, column_names_set)};

View File

@ -8,11 +8,10 @@
#include <Poco/Util/XMLConfiguration.h> #include <Poco/Util/XMLConfiguration.h>
#include <Common/escapeForFileName.h> #include <Common/escapeForFileName.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/typeid_cast.h>
#include <IO/ReadBufferFromFile.h> #include <Compression/CompressionFactory.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h> #include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h> #include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
@ -25,9 +24,6 @@
#include <Columns/ColumnArray.h> #include <Columns/ColumnArray.h>
#include <Common/typeid_cast.h>
#include <Compression/CompressionFactory.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Storages/StorageTinyLog.h> #include <Storages/StorageTinyLog.h>
@ -84,8 +80,8 @@ private:
struct Stream struct Stream
{ {
Stream(const DiskPtr & disk, const String & data_path, size_t max_read_buffer_size) Stream(const DiskPtr & disk, const String & data_path, size_t max_read_buffer_size_)
: plain(disk->read(data_path, std::min(max_read_buffer_size, disk->getFileSize(data_path)))), : plain(disk->read(data_path, std::min(max_read_buffer_size_, disk->getFileSize(data_path)))),
compressed(*plain) compressed(*plain)
{ {
} }