mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
code cleanup
This commit is contained in:
parent
2797873921
commit
b0906abb0d
@ -5,9 +5,16 @@ namespace DB
|
||||
{
|
||||
struct AlterAnalysisResult
|
||||
{
|
||||
/// Expression for column type conversion.
|
||||
/// If no conversions are needed, expression=nullptr.
|
||||
ExpressionActionsPtr expression = nullptr;
|
||||
|
||||
/// Denotes if metadata must be changed even if no file should be overwritten
|
||||
/// (used for transformation-free changing of Enum values list).
|
||||
bool force_update_metadata = false;
|
||||
|
||||
std::map<String, const IDataType *> new_types;
|
||||
|
||||
/// For every column that need to be converted: source column name,
|
||||
/// column name of calculated expression for conversion.
|
||||
std::vector<std::pair<String, String>> conversions;
|
||||
|
@ -83,6 +83,8 @@ public:
|
||||
|
||||
virtual String getFileNameForColumn(const NameAndTypePair & column) const = 0;
|
||||
|
||||
/// Returns rename map of column files for the alter converting expression onto new table files.
|
||||
/// Files to be deleted are mapped to an empty string in rename map.
|
||||
virtual NameToNameMap createRenameMapForAlter(
|
||||
AlterAnalysisResult & /* analysis_result */,
|
||||
const NamesAndTypesList & /* old_columns */) const { return {}; }
|
||||
@ -306,7 +308,7 @@ public:
|
||||
protected:
|
||||
/// Columns description.
|
||||
NamesAndTypesList columns;
|
||||
Type part_type;
|
||||
const Type part_type;
|
||||
|
||||
void removeIfNeeded();
|
||||
|
||||
|
@ -1,7 +1,5 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/WriteBufferFromFileBase.h>
|
||||
#include <Compression/CompressedWriteBuffer.h>
|
||||
|
@ -6,18 +6,6 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
// constexpr auto DATA_FILE_EXTENSION = ".bin";
|
||||
// constexpr auto INDEX_FILE_EXTENSION = ".idx";
|
||||
}
|
||||
|
||||
|
||||
IMergedBlockOutputStream::IMergedBlockOutputStream(
|
||||
const MergeTreeDataPartPtr & data_part)
|
||||
: storage(data_part->storage)
|
||||
|
@ -1,10 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <Compression/CompressedWriteBuffer.h>
|
||||
#include <IO/HashingWriteBuffer.h>
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <DataStreams/IBlockOutputStream.h>
|
||||
#include <Storages/MergeTree/IMergeTreeDataPart.h>
|
||||
@ -28,7 +24,6 @@ public:
|
||||
|
||||
protected:
|
||||
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
|
||||
using SerializationStates = std::vector<SerializationState>;
|
||||
|
||||
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets);
|
||||
|
||||
|
@ -4,7 +4,6 @@
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Storages/MergeTree/RangesInDataPart.h>
|
||||
#include <Storages/MergeTree/MergeTreeRangeReader.h>
|
||||
// #include <Storages/MergeTree/MergeTreeIOSettings.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
|
@ -103,6 +103,7 @@ namespace ErrorCodes
|
||||
extern const int UNKNOWN_SETTING;
|
||||
extern const int READONLY_SETTING;
|
||||
extern const int ABORTED;
|
||||
extern const int UNKNOWN_PART_TYPE;
|
||||
}
|
||||
|
||||
|
||||
@ -2272,13 +2273,13 @@ void MergeTreeData::removePartsFromWorkingSet(const MergeTreeData::DataPartsVect
|
||||
|
||||
for (const DataPartPtr & part : remove)
|
||||
{
|
||||
if (part->state ==IMergeTreeDataPart::State::Committed)
|
||||
if (part->state == IMergeTreeDataPart::State::Committed)
|
||||
removePartContributionToColumnSizes(part);
|
||||
|
||||
if (part->state ==IMergeTreeDataPart::State::Committed || clear_without_timeout)
|
||||
if (part->state == IMergeTreeDataPart::State::Committed || clear_without_timeout)
|
||||
part->remove_time.store(remove_time, std::memory_order_relaxed);
|
||||
|
||||
if (part->state !=IMergeTreeDataPart::State::Outdated)
|
||||
if (part->state != IMergeTreeDataPart::State::Outdated)
|
||||
modifyPartState(part,IMergeTreeDataPart::State::Outdated);
|
||||
}
|
||||
}
|
||||
@ -2762,9 +2763,9 @@ void MergeTreeData::loadPartAndFixMetadata(MutableDataPartPtr part)
|
||||
String full_part_path = part->getFullPath();
|
||||
|
||||
/// Earlier the list of columns was written incorrectly. Delete it and re-create.
|
||||
if (isWidePart(part))
|
||||
if (Poco::File(full_part_path + "columns.txt").exists())
|
||||
Poco::File(full_part_path + "columns.txt").remove();
|
||||
/// But in compact parts we can't get list of columns without this file.
|
||||
if (isWidePart(part) && Poco::File(full_part_path + "columns.txt").exists())
|
||||
Poco::File(full_part_path + "columns.txt").remove();
|
||||
|
||||
part->loadColumnsChecksumsIndexes(false, true);
|
||||
part->modification_time = Poco::File(full_part_path).getLastModified().epochTime();
|
||||
@ -3746,7 +3747,7 @@ MergeTreeData::CurrentlyMovingPartsTagger::CurrentlyMovingPartsTagger(MergeTreeM
|
||||
: parts_to_move(std::move(moving_parts_)), data(data_)
|
||||
{
|
||||
for (const auto & moving_part : parts_to_move)
|
||||
if (!data.currently_moving_parts.emplace(moving_part.part->name).second)
|
||||
if (!data.currently_moving_parts.emplace(moving_part.part).second)
|
||||
throw Exception("Cannot move part '" + moving_part.part->name + "'. It's already moving.", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
@ -3756,9 +3757,9 @@ MergeTreeData::CurrentlyMovingPartsTagger::~CurrentlyMovingPartsTagger()
|
||||
for (const auto & moving_part : parts_to_move)
|
||||
{
|
||||
/// Something went completely wrong
|
||||
if (!data.currently_moving_parts.count(moving_part.part->name))
|
||||
if (!data.currently_moving_parts.count(moving_part.part))
|
||||
std::terminate();
|
||||
data.currently_moving_parts.erase(moving_part.part->name);
|
||||
data.currently_moving_parts.erase(moving_part.part);
|
||||
}
|
||||
}
|
||||
|
||||
@ -3802,7 +3803,7 @@ MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::selectPartsForMove()
|
||||
*reason = "part already assigned to background operation.";
|
||||
return false;
|
||||
}
|
||||
if (currently_moving_parts.count(part->name))
|
||||
if (currently_moving_parts.count(part))
|
||||
{
|
||||
*reason = "part is already moving.";
|
||||
return false;
|
||||
@ -3836,7 +3837,7 @@ MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::checkPartsForMove(const
|
||||
"Move is not possible: " + path_to_clone + part->name + " already exists",
|
||||
ErrorCodes::DIRECTORY_ALREADY_EXISTS);
|
||||
|
||||
if (currently_moving_parts.count(part->name) || partIsAssignedToBackgroundOperation(part))
|
||||
if (currently_moving_parts.count(part) || partIsAssignedToBackgroundOperation(part))
|
||||
throw Exception(
|
||||
"Cannot move part '" + part->name + "' because it's participating in background process",
|
||||
ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
|
||||
|
@ -80,7 +80,7 @@ namespace ErrorCodes
|
||||
/// The same files as for month-partitioned tables, plus
|
||||
/// count.txt - contains total number of rows in this part.
|
||||
/// partition.dat - contains the value of the partitioning expression.
|
||||
/// minmax_[Column].idx - MinMax indexes (seeIMergeTreeDataPart::MinMaxIndex class) for the columns required by the partitioning expression.
|
||||
/// minmax_[Column].idx - MinMax indexes (see IMergeTreeDataPart::MinMaxIndex class) for the columns required by the partitioning expression.
|
||||
///
|
||||
/// Several modes are implemented. Modes determine additional actions during merge:
|
||||
/// - Ordinary - don't do anything special
|
||||
@ -184,7 +184,7 @@ public:
|
||||
|
||||
/// After this methods setColumns must be called
|
||||
MutableDataPartPtr createPart(const String & name,
|
||||
const MergeTreePartInfo & part_info,const DiskPtr & disk,
|
||||
const MergeTreePartInfo & part_info, const DiskPtr & disk,
|
||||
const NamesAndTypesList & columns,
|
||||
size_t bytes_on_disk, size_t rows_num,
|
||||
const String & relative_path) const;
|
||||
@ -789,7 +789,7 @@ public:
|
||||
/// if we decide to move some part to another disk, than we
|
||||
/// assuredly will choose this disk for containing part, which will appear
|
||||
/// as result of merge or mutation.
|
||||
NameSet currently_moving_parts;
|
||||
DataParts currently_moving_parts;
|
||||
|
||||
/// Mutex for currently_moving_parts
|
||||
mutable std::mutex moving_parts_mutex;
|
||||
@ -797,8 +797,6 @@ public:
|
||||
protected:
|
||||
|
||||
friend class IMergeTreeDataPart;
|
||||
friend class MergeTreeDataPartWide;
|
||||
friend class MergeTreeDataPartCompact;
|
||||
friend class MergeTreeDataMergerMutator;
|
||||
friend class ReplicatedMergeTreeAlterThread;
|
||||
friend struct ReplicatedMergeTreeTableMetadata;
|
||||
@ -935,8 +933,6 @@ protected:
|
||||
|
||||
void setStoragePolicy(const String & new_storage_policy_name, bool only_check = false);
|
||||
|
||||
/// Expression for column type conversion.
|
||||
/// If no conversions are needed, out_expression=nullptr.
|
||||
/// out_rename_map maps column files for the out_expression onto new table files.
|
||||
/// out_force_update_metadata denotes if metadata must be changed even if out_rename_map is empty (used
|
||||
/// for transformation-free changing of Enum values list).
|
||||
|
@ -1,30 +1,8 @@
|
||||
#include "MergeTreeDataPartCompact.h"
|
||||
|
||||
#include <optional>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Compression/CompressedReadBuffer.h>
|
||||
#include <Compression/CompressedWriteBuffer.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/HashingWriteBuffer.h>
|
||||
#include <Core/Defines.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
#include <Common/localBackup.h>
|
||||
#include <Compression/CompressionInfo.h>
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Poco/File.h>
|
||||
#include <Poco/Path.h>
|
||||
#include <Poco/DirectoryIterator.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <common/JSON.h>
|
||||
|
||||
#include <Storages/MergeTree/MergeTreeReaderCompact.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPartWriterCompact.h>
|
||||
#include <Storages/MergeTree/IMergeTreeReader.h>
|
||||
#include <Poco/File.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -32,14 +10,8 @@ namespace DB
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int FILE_DOESNT_EXIST;
|
||||
extern const int NO_FILE_IN_DATA_PART;
|
||||
extern const int EXPECTED_END_OF_FILE;
|
||||
extern const int CORRUPTED_DATA;
|
||||
extern const int NOT_FOUND_EXPECTED_DATA_PART;
|
||||
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
|
||||
extern const int BAD_TTL_FILE;
|
||||
extern const int CANNOT_UNLINK;
|
||||
}
|
||||
|
||||
|
||||
@ -69,12 +41,13 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartCompact::getReader(
|
||||
MarkCache * mark_cache,
|
||||
const MergeTreeReaderSettings & reader_settings,
|
||||
const ValueSizeMap & avg_value_size_hints,
|
||||
const ReadBufferFromFileBase::ProfileCallback & /* profile_callback */) const
|
||||
const ReadBufferFromFileBase::ProfileCallback & profile_callback) const
|
||||
{
|
||||
/// FIXME maybe avoid shared_from_this
|
||||
return std::make_unique<MergeTreeReaderCompact>(
|
||||
shared_from_this(), columns_to_read, uncompressed_cache,
|
||||
mark_cache, mark_ranges, reader_settings, avg_value_size_hints);
|
||||
mark_cache, mark_ranges, reader_settings,
|
||||
avg_value_size_hints, profile_callback);
|
||||
}
|
||||
|
||||
IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter(
|
||||
|
@ -1,24 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/Row.h>
|
||||
#include <Core/Block.h>
|
||||
#include <Core/Types.h>
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndices.h>
|
||||
#include <Storages/MergeTree/MergeTreePartInfo.h>
|
||||
#include <Storages/MergeTree/MergeTreePartition.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPartChecksum.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPartTTLInfo.h>
|
||||
#include <Storages/MergeTree/KeyCondition.h>
|
||||
#include <Storages/MergeTree/IMergeTreeDataPart.h>
|
||||
#include <Columns/IColumn.h>
|
||||
|
||||
#include <Poco/Path.h>
|
||||
|
||||
#include <shared_mutex>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -26,7 +8,6 @@ namespace DB
|
||||
struct ColumnSize;
|
||||
class MergeTreeData;
|
||||
|
||||
|
||||
/// Description of the data part.
|
||||
class MergeTreeDataPartCompact : public IMergeTreeDataPart
|
||||
{
|
||||
@ -88,7 +69,4 @@ private:
|
||||
void loadIndexGranularity() override;
|
||||
};
|
||||
|
||||
|
||||
// using MergeTreeDataPartState =IMergeTreeDataPart::State;
|
||||
|
||||
}
|
||||
|
@ -1,29 +1,6 @@
|
||||
#include "MergeTreeDataPartWide.h"
|
||||
|
||||
#include <optional>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Compression/CompressedReadBuffer.h>
|
||||
#include <Compression/CompressedWriteBuffer.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/HashingWriteBuffer.h>
|
||||
#include <Core/Defines.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
#include <Common/localBackup.h>
|
||||
#include <Compression/CompressionInfo.h>
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Poco/File.h>
|
||||
#include <Poco/Path.h>
|
||||
#include <Poco/DirectoryIterator.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <common/JSON.h>
|
||||
|
||||
#include <Storages/MergeTree/MergeTreeReaderWide.h>
|
||||
#include <Storages/MergeTree/IMergeTreeReader.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPartWriterWide.h>
|
||||
#include <Storages/MergeTree/IMergeTreeDataPartWriter.h>
|
||||
|
||||
@ -33,22 +10,11 @@ namespace DB
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int FILE_DOESNT_EXIST;
|
||||
extern const int NO_FILE_IN_DATA_PART;
|
||||
extern const int EXPECTED_END_OF_FILE;
|
||||
extern const int CORRUPTED_DATA;
|
||||
extern const int NOT_FOUND_EXPECTED_DATA_PART;
|
||||
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
|
||||
extern const int BAD_TTL_FILE;
|
||||
extern const int CANNOT_UNLINK;
|
||||
}
|
||||
|
||||
|
||||
// static ReadBufferFromFile openForReading(const String & path)
|
||||
// {
|
||||
// return ReadBufferFromFile(path, std::min(static_cast<Poco::File::FileSize>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
|
||||
// }
|
||||
|
||||
MergeTreeDataPartWide::MergeTreeDataPartWide(
|
||||
MergeTreeData & storage_,
|
||||
const String & name_,
|
||||
|
@ -1,24 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/Row.h>
|
||||
#include <Core/Block.h>
|
||||
#include <Core/Types.h>
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndices.h>
|
||||
#include <Storages/MergeTree/MergeTreePartInfo.h>
|
||||
#include <Storages/MergeTree/MergeTreePartition.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPartChecksum.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPartTTLInfo.h>
|
||||
#include <Storages/MergeTree/KeyCondition.h>
|
||||
#include <Storages/MergeTree/IMergeTreeDataPart.h>
|
||||
#include <Columns/IColumn.h>
|
||||
|
||||
#include <Poco/Path.h>
|
||||
|
||||
#include <shared_mutex>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -91,6 +73,4 @@ private:
|
||||
ColumnSize getColumnSizeImpl(const String & name, const IDataType & type, std::unordered_set<String> * processed_substreams) const;
|
||||
};
|
||||
|
||||
// using MergeTreeDataPartState =IMergeTreeDataPart::State;
|
||||
|
||||
}
|
||||
|
@ -1,5 +1,4 @@
|
||||
#include <Storages/MergeTree/IMergeTreeDataPartWriter.h>
|
||||
#include <DataStreams/SquashingTransform.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
@ -80,20 +80,6 @@ void MergeTreeDataPartWriterWide::write(const Block & block,
|
||||
const IColumn::Permutation * permutation,
|
||||
const Block & primary_key_block, const Block & skip_indexes_block)
|
||||
{
|
||||
// if (serialization_states.empty())
|
||||
// {
|
||||
// serialization_states.reserve(columns_list.size());
|
||||
// WrittenOffsetColumns tmp_offset_columns;
|
||||
// IDataType::SerializeBinaryBulkSettings serialize_settings;
|
||||
|
||||
// for (const auto & col : columns_list)
|
||||
// {
|
||||
// serialize_settings.getter = createStreamGetter(col.name, tmp_offset_columns, false);
|
||||
// serialization_states.emplace_back(nullptr);
|
||||
// col.type->serializeBinaryBulkStatePrefix(serialize_settings, serialization_states.back());
|
||||
// }
|
||||
// }
|
||||
|
||||
/// Fill index granularity for this block
|
||||
/// if it's unknown (in case of insert data or horizontal merge,
|
||||
/// but not in case of vertical merge)
|
||||
@ -294,6 +280,7 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Ch
|
||||
{
|
||||
if (!serialization_states.empty())
|
||||
{
|
||||
/// FIXME maybe we need skip_offsets=false in some cases
|
||||
serialize_settings.getter = createStreamGetter(it->name, written_offset_columns ? *written_offset_columns : offset_columns);
|
||||
it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[it->name]);
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ namespace DB
|
||||
class MergeTreeDataPartWriterWide : public IMergeTreeDataPartWriter
|
||||
{
|
||||
public:
|
||||
|
||||
using ColumnToSize = std::map<std::string, UInt64>;
|
||||
|
||||
MergeTreeDataPartWriterWide(
|
||||
|
@ -11,6 +11,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int UNKNOWN_PART_TYPE;
|
||||
}
|
||||
|
||||
std::optional<std::string> MergeTreeIndexGranularityInfo::getMrkExtensionFromFS(const std::string & path_to_part)
|
||||
|
@ -8,11 +8,6 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int UNKNOWN_PART_TYPE;
|
||||
}
|
||||
|
||||
class MergeTreeData;
|
||||
|
||||
/// Meta information about index granularity
|
||||
|
@ -17,7 +17,8 @@ namespace ErrorCodes
|
||||
|
||||
MergeTreeReaderCompact::MergeTreeReaderCompact(const MergeTreeData::DataPartPtr & data_part_,
|
||||
const NamesAndTypesList & columns_, UncompressedCache * uncompressed_cache_, MarkCache * mark_cache_,
|
||||
const MarkRanges & mark_ranges_, const MergeTreeReaderSettings & settings_, const ValueSizeMap & avg_value_size_hints_)
|
||||
const MarkRanges & mark_ranges_, const MergeTreeReaderSettings & settings_, const ValueSizeMap & avg_value_size_hints_,
|
||||
const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_)
|
||||
: IMergeTreeReader(data_part_, columns_
|
||||
, uncompressed_cache_, mark_cache_, mark_ranges_
|
||||
, settings_, avg_value_size_hints_)
|
||||
@ -31,8 +32,8 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(const MergeTreeData::DataPartPtr
|
||||
auto buffer = std::make_unique<CachedCompressedReadBuffer>(
|
||||
full_data_path, uncompressed_cache, 0, settings.min_bytes_to_use_direct_io, buffer_size);
|
||||
|
||||
// if (profile_callback)
|
||||
// buffer->setProfileCallback(profile_callback, clock_type);
|
||||
if (profile_callback_)
|
||||
buffer->setProfileCallback(profile_callback_, clock_type_);
|
||||
|
||||
cached_buffer = std::move(buffer);
|
||||
data_buffer = cached_buffer.get();
|
||||
@ -42,8 +43,8 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(const MergeTreeData::DataPartPtr
|
||||
auto buffer = std::make_unique<CompressedReadBufferFromFile>(
|
||||
full_data_path, 0, settings.min_bytes_to_use_direct_io, buffer_size);
|
||||
|
||||
// if (profile_callback)
|
||||
// buffer->setProfileCallback(profile_callback, clock_type);
|
||||
if (profile_callback_)
|
||||
buffer->setProfileCallback(profile_callback_, clock_type_);
|
||||
|
||||
non_cached_buffer = std::move(buffer);
|
||||
data_buffer = non_cached_buffer.get();
|
||||
@ -110,19 +111,13 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
|
||||
{
|
||||
size_t column_size_before_reading = column->size();
|
||||
|
||||
readData(*column, *type, from_mark, *column_positions[pos], rows_to_read, read_only_offsets[pos]);
|
||||
readData(name, *column, *type, from_mark, *column_positions[pos], rows_to_read, read_only_offsets[pos]);
|
||||
|
||||
size_t read_rows_in_column = column->size() - column_size_before_reading;
|
||||
|
||||
if (read_rows_in_column < rows_to_read)
|
||||
throw Exception("Cannot read all data in MergeTreeReaderCompact. Rows read: " + toString(read_rows_in_column) +
|
||||
". Rows expected: " + toString(rows_to_read) + ".", ErrorCodes::CANNOT_READ_ALL_DATA);
|
||||
|
||||
/// For elements of Nested, column_size_before_reading may be greater than column size
|
||||
/// if offsets are not empty and were already read, but elements are empty.
|
||||
/// FIXME
|
||||
// if (column->size())
|
||||
// read_rows_in_mark = std::max(read_rows, column->size() - column_size_before_reading);
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
@ -168,7 +163,7 @@ MergeTreeReaderCompact::ColumnPosition MergeTreeReaderCompact::findColumnForOffs
|
||||
|
||||
|
||||
void MergeTreeReaderCompact::readData(
|
||||
IColumn & column, const IDataType & type,
|
||||
const String & name, IColumn & column, const IDataType & type,
|
||||
size_t from_mark, size_t column_position, size_t rows_to_read, bool only_offsets)
|
||||
{
|
||||
if (!isContinuousReading(from_mark, column_position))
|
||||
@ -184,7 +179,7 @@ void MergeTreeReaderCompact::readData(
|
||||
|
||||
IDataType::DeserializeBinaryBulkSettings deserialize_settings;
|
||||
deserialize_settings.getter = buffer_getter;
|
||||
// deserialize_settings.avg_value_size_hint = avg_value_size_hints[name];
|
||||
deserialize_settings.avg_value_size_hint = avg_value_size_hints[name];
|
||||
deserialize_settings.position_independent_encoding = true;
|
||||
|
||||
IDataType::DeserializeBinaryBulkStatePtr state;
|
||||
|
@ -2,7 +2,6 @@
|
||||
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Storages/MergeTree/IMergeTreeReader.h>
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <port/clock.h>
|
||||
|
||||
|
||||
@ -19,7 +18,9 @@ public:
|
||||
MarkCache * mark_cache_,
|
||||
const MarkRanges & mark_ranges_,
|
||||
const MergeTreeReaderSettings & settings_,
|
||||
const ValueSizeMap & avg_value_size_hints_ = ValueSizeMap{});
|
||||
const ValueSizeMap & avg_value_size_hints_ = ValueSizeMap{},
|
||||
const ReadBufferFromFileBase::ProfileCallback & profile_callback_ = ReadBufferFromFileBase::ProfileCallback{},
|
||||
clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE);
|
||||
|
||||
/// Return the number of rows has been read or zero if there is no columns to read.
|
||||
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
|
||||
@ -37,7 +38,7 @@ private:
|
||||
MergeTreeMarksLoader marks_loader;
|
||||
|
||||
using ColumnPosition = std::optional<size_t>;
|
||||
/// Positions of columns in part structe.
|
||||
/// Positions of columns in part structure.
|
||||
std::vector<ColumnPosition> column_positions;
|
||||
/// Should we read full column or only it's offsets
|
||||
std::vector<bool> read_only_offsets;
|
||||
@ -48,7 +49,7 @@ private:
|
||||
void initMarksLoader();
|
||||
void seekToMark(size_t row_index, size_t column_index);
|
||||
|
||||
void readData(IColumn & column, const IDataType & type,
|
||||
void readData(const String & name, IColumn & column, const IDataType & type,
|
||||
size_t from_mark, size_t column_position, size_t rows_to_read, bool only_offsets = false);
|
||||
|
||||
ColumnPosition findColumnForOffsets(const String & column_name);
|
||||
|
@ -27,8 +27,7 @@ namespace ErrorCodes
|
||||
MergeTreeReaderWide::MergeTreeReaderWide(const MergeTreeData::DataPartPtr & data_part_,
|
||||
const NamesAndTypesList & columns_, UncompressedCache * uncompressed_cache_, MarkCache * mark_cache_,
|
||||
const MarkRanges & mark_ranges_, const MergeTreeReaderSettings & settings_, const ValueSizeMap & avg_value_size_hints_,
|
||||
const ReadBufferFromFileBase::ProfileCallback & profile_callback_,
|
||||
clockid_t clock_type_)
|
||||
const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_)
|
||||
: IMergeTreeReader(data_part_, columns_
|
||||
, uncompressed_cache_, mark_cache_, mark_ranges_
|
||||
, settings_, avg_value_size_hints_)
|
||||
|
@ -53,7 +53,7 @@ MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream(
|
||||
|
||||
MergeTreeReaderSettings reader_settings =
|
||||
{
|
||||
/// This is hack
|
||||
/// bytes to use AIO (this is hack)
|
||||
.min_bytes_to_use_direct_io = read_with_direct_io ? 1UL : std::numeric_limits<size_t>::max(),
|
||||
.max_read_buffer_size = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
.save_marks_in_cache = false
|
||||
|
@ -1,12 +1,4 @@
|
||||
#include <Storages/MergeTree/MergedBlockOutputStream.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
|
||||
#include <IO/createWriteBufferFromFileBase.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
#include <DataStreams/MarkInCompressedFile.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/MemoryTracker.h>
|
||||
#include <Poco/File.h>
|
||||
|
||||
|
||||
@ -29,7 +21,8 @@ MergedBlockOutputStream::MergedBlockOutputStream(
|
||||
{
|
||||
MergeTreeWriterSettings writer_settings(data_part->storage.global_context.getSettings(),
|
||||
data_part->storage.canUseAdaptiveGranularity(), blocks_are_granules_size);
|
||||
writer = data_part->getWriter(columns_list, data_part->storage.getSkipIndices(), default_codec, writer_settings);
|
||||
|
||||
writer = data_part->getWriter(columns_list, data_part->storage.getSkipIndices(), default_codec, std::move(writer_settings));
|
||||
init();
|
||||
}
|
||||
|
||||
|
@ -20,7 +20,8 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
|
||||
writer_settings.filename_suffix = filename_suffix;
|
||||
writer_settings.skip_offsets = skip_offsets_;
|
||||
|
||||
writer = data_part->getWriter(header.getNamesAndTypesList(), indices_to_recalc, default_codec, writer_settings, index_granularity);
|
||||
writer = data_part->getWriter(header.getNamesAndTypesList(), indices_to_recalc,
|
||||
default_codec,std::move(writer_settings), index_granularity);
|
||||
writer->setWrittenOffsetColumns(offset_columns_);
|
||||
writer->initSkipIndices();
|
||||
}
|
||||
|
@ -1,8 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/MergeTree/IMergedBlockOutputStream.h>
|
||||
#include <Storages/MergeTree/IMergeTreeDataPart.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPartWriterWide.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
@ -24,6 +24,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CORRUPTED_DATA;
|
||||
extern const int UNKNOWN_PART_TYPE;
|
||||
}
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user