data_part instead of volume in IMergeTreeDataPartWriter (as it done in IMergeTreeReader)

This commit is contained in:
Gleb Novikov 2020-05-10 16:33:27 +03:00
parent 57b6fb6d80
commit 1235c36f3e
8 changed files with 24 additions and 37 deletions

View File

@ -63,18 +63,16 @@ void IMergeTreeDataPartWriter::Stream::addToChecksums(MergeTreeData::DataPart::C
IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
const VolumePtr & volume_,
const String & part_path_,
const MergeTreeData & storage_,
const MergeTreeData::DataPartPtr & data_part_,
const NamesAndTypesList & columns_list_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_)
: volume(volume_)
, part_path(part_path_)
, storage(storage_)
: data_part(data_part_)
, part_path(data_part_->getFullRelativePath())
, storage(data_part_->storage)
, columns_list(columns_list_)
, marks_file_extension(marks_file_extension_)
, index_granularity(index_granularity_)
@ -87,8 +85,8 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
if (settings.blocks_are_granules_size && !index_granularity.empty())
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
if (!volume->getDisk()->exists(part_path))
volume->getDisk()->createDirectories(part_path);
if (!data_part->volume->getDisk()->exists(part_path))
data_part->volume->getDisk()->createDirectories(part_path);
}
IMergeTreeDataPartWriter::~IMergeTreeDataPartWriter() = default;
@ -165,7 +163,7 @@ void IMergeTreeDataPartWriter::initPrimaryIndex()
{
if (storage.hasPrimaryKey())
{
index_file_stream = volume->getDisk()->writeFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
index_file_stream = data_part->volume->getDisk()->writeFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
index_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
}
@ -180,7 +178,7 @@ void IMergeTreeDataPartWriter::initSkipIndices()
skip_indices_streams.emplace_back(
std::make_unique<IMergeTreeDataPartWriter::Stream>(
stream_name,
volume->getDisk(),
data_part->volume->getDisk(),
part_path + stream_name, INDEX_FILE_EXTENSION,
part_path + stream_name, marks_file_extension,
default_codec, settings.max_compress_block_size,

View File

@ -61,9 +61,7 @@ public:
using StreamPtr = std::unique_ptr<Stream>;
IMergeTreeDataPartWriter(
const VolumePtr & volume,
const String & part_path,
const MergeTreeData & storage,
const MergeTreeData::DataPartPtr & data_part,
const NamesAndTypesList & columns_list,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const String & marks_file_extension,
@ -118,7 +116,7 @@ protected:
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
using SerializationStates = std::unordered_map<String, SerializationState>;
VolumePtr volume;
MergeTreeData::DataPartPtr data_part;
String part_path;
const MergeTreeData & storage;
NamesAndTypesList columns_list;

View File

@ -68,7 +68,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter(
{ return *getColumnPosition(lhs.name) < *getColumnPosition(rhs.name); });
return std::make_unique<MergeTreeDataPartWriterCompact>(
volume, getFullRelativePath(), storage, ordered_columns_list, indices_to_recalc,
shared_from_this(), ordered_columns_list, indices_to_recalc,
index_granularity_info.marks_file_extension,
default_codec, writer_settings, computed_index_granularity);
}

View File

@ -59,7 +59,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartWide::getWriter(
const MergeTreeIndexGranularity & computed_index_granularity) const
{
return std::make_unique<MergeTreeDataPartWriterWide>(
volume, getFullRelativePath(), storage, columns_list, indices_to_recalc,
shared_from_this(), columns_list, indices_to_recalc,
index_granularity_info.marks_file_extension,
default_codec, writer_settings, computed_index_granularity);
}

View File

@ -6,26 +6,23 @@ namespace DB
MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
const VolumePtr & volume_,
const String & part_path_,
const MergeTreeData & storage_,
const MergeTreeData::DataPartPtr & data_part_,
const NamesAndTypesList & columns_list_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_)
: IMergeTreeDataPartWriter(volume_, part_path_,
storage_, columns_list_,
indices_to_recalc_, marks_file_extension_,
default_codec_, settings_, index_granularity_)
: IMergeTreeDataPartWriter(data_part_, columns_list_,
indices_to_recalc_, marks_file_extension_,
default_codec_, settings_, index_granularity_)
{
using DataPart = MergeTreeDataPartCompact;
String data_file_name = DataPart::DATA_FILE_NAME;
stream = std::make_unique<Stream>(
data_file_name,
volume->getDisk(),
data_part->volume->getDisk(),
part_path + data_file_name, DataPart::DATA_FILE_EXTENSION,
part_path + data_file_name, marks_file_extension,
default_codec,

View File

@ -8,9 +8,7 @@ class MergeTreeDataPartWriterCompact : public IMergeTreeDataPartWriter
{
public:
MergeTreeDataPartWriterCompact(
const VolumePtr & volume,
const String & part_path,
const MergeTreeData & storage,
const MergeTreeData::DataPartPtr & data_part,
const NamesAndTypesList & columns_list,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const String & marks_file_extension,

View File

@ -13,18 +13,16 @@ namespace
}
MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide(
const VolumePtr & volume_,
const String & part_path_,
const MergeTreeData & storage_,
const MergeTreeData::DataPartPtr & data_part_,
const NamesAndTypesList & columns_list_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_)
: IMergeTreeDataPartWriter(volume_, part_path_,
storage_, columns_list_, indices_to_recalc_,
marks_file_extension_, default_codec_, settings_, index_granularity_)
: IMergeTreeDataPartWriter(data_part_, columns_list_,
indices_to_recalc_, marks_file_extension_,
default_codec_, settings_, index_granularity_)
{
const auto & columns = storage.getColumns();
for (const auto & it : columns_list)
@ -46,7 +44,7 @@ void MergeTreeDataPartWriterWide::addStreams(
column_streams[stream_name] = std::make_unique<Stream>(
stream_name,
volume->getDisk(),
data_part->volume->getDisk(),
part_path + stream_name, DATA_FILE_EXTENSION,
part_path + stream_name, marks_file_extension,
effective_codec,

View File

@ -11,9 +11,7 @@ public:
using ColumnToSize = std::map<std::string, UInt64>;
MergeTreeDataPartWriterWide(
const VolumePtr & volume,
const String & part_path,
const MergeTreeData & storage,
const MergeTreeData::DataPartPtr & data_part,
const NamesAndTypesList & columns_list,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const String & marks_file_extension,