polymorphic parts (development) alter update

This commit is contained in:
CurtizJ 2019-12-18 16:09:58 +03:00
parent 831f39a3df
commit 59faa4927b
14 changed files with 47 additions and 23 deletions

View File

@ -8,6 +8,7 @@ namespace DB
ExpressionBlockInputStream::ExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_)
: expression(expression_)
{
std::cerr << "expression: " << expression->dumpActions();
children.push_back(input);
cached_header = children.back()->getHeader();
expression->execute(cached_header, true);

View File

@ -314,6 +314,8 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &p
if (i > 0)
prepared_stages[i].output_columns = prepared_stages[i - 1].output_columns;
else if (!commands.additional_columns.empty())
prepared_stages[i].output_columns.insert(commands.additional_columns.begin(), commands.additional_columns.end());
if (prepared_stages[i].output_columns.size() < all_columns.size())
{

View File

@ -18,7 +18,7 @@ class Context;
class MutationsInterpreter
{
public:
MutationsInterpreter(StoragePtr storage_, std::vector<MutationCommand> commands_, const Context & context_)
MutationsInterpreter(StoragePtr storage_, MutationCommands commands_, const Context & context_)
: storage(std::move(storage_))
, commands(std::move(commands_))
, context(context_)
@ -48,7 +48,7 @@ private:
BlockInputStreamPtr addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, BlockInputStreamPtr in) const;
StoragePtr storage;
std::vector<MutationCommand> commands;
MutationCommands commands;
const Context & context;
/// A sequence of mutation commands is executed as a sequence of stages. Each stage consists of several

View File

@ -847,4 +847,14 @@ void IMergeTreeDataPart::makeCloneOnDiskDetached(const DiskSpace::ReservationPtr
cloning_directory.copyTo(path_to_clone);
}
bool isCompactPart(const MergeTreeDataPartPtr & data_part)
{
return (data_part && data_part->getType() == MergeTreeDataPartType::COMPACT);
}
bool isWidePart(const MergeTreeDataPartPtr & data_part)
{
return (data_part && data_part->getType() == MergeTreeDataPartType::WIDE);
}
}

View File

@ -363,4 +363,7 @@ private:
using MergeTreeDataPartState = IMergeTreeDataPart::State;
using MergeTreeDataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
bool isCompactPart(const MergeTreeDataPartPtr & data_part);
bool isWidePart(const MergeTreeDataPartPtr & data_part);
}

View File

@ -2600,6 +2600,8 @@ void MergeTreeData::loadPartAndFixMetadata(MutableDataPartPtr part)
String full_part_path = part->getFullPath();
/// Earlier the list of columns was written incorrectly. Delete it and re-create.
/// FIXME looks not right
if (isWidePart(part))
if (Poco::File(full_part_path + "columns.txt").exists())
Poco::File(full_part_path + "columns.txt").remove();

View File

@ -909,7 +909,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTemporaryPart(
const FutureMergedMutatedPart & future_part,
const std::vector<MutationCommand> & commands,
const MutationCommands & commands,
MergeListEntry & merge_entry,
const Context & context,
DiskSpace::Reservation * space_reservation,
@ -931,13 +931,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
CurrentMetrics::Increment num_mutations{CurrentMetrics::PartMutation};
const auto & source_part = future_part.parts[0];
auto storage_from_source_part = StorageFroIMergeTreeDataPart::create(source_part);
auto storage_from_source_part = StorageFromMergeTreeDataPart::create(source_part);
auto context_for_reading = context;
context_for_reading.getSettingsRef().merge_tree_uniform_read_distribution = 0;
context_for_reading.getSettingsRef().max_threads = 1;
std::vector<MutationCommand> commands_for_part;
MutationCommands commands_for_part;
std::copy_if(
std::cbegin(commands), std::cend(commands),
std::back_inserter(commands_for_part),
@ -948,6 +947,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
command.partition, context_for_reading);
});
if (isCompactPart(source_part))
commands_for_part.additional_columns = source_part->columns.getNames();
MutationsInterpreter mutations_interpreter(storage_from_source_part, commands_for_part, context_for_reading);
if (!mutations_interpreter.isStorageTouchedByMutations())

View File

@ -102,7 +102,7 @@ public:
/// Mutate a single data part with the specified commands. Will create and return a temporary part.
MergeTreeData::MutableDataPartPtr mutatePartToTemporaryPart(
const FutureMergedMutatedPart & future_part,
const std::vector<MutationCommand> & commands,
const MutationCommands & commands,
MergeListEntry & merge_entry, const Context & context,
DiskSpace::Reservation * disk_reservation,
TableStructureReadLockHolder & table_lock_holder);

View File

@ -198,7 +198,7 @@ bool MergeTreeDataPartCompact::hasColumnFiles(const String & column_name, const
{
if (!getColumnPosition(column_name))
return false;
/// FIXME replace everywhere hardcoded "data"
auto bin_checksum = checksums.files.find(String(DATA_FILE_NAME) + DATA_FILE_EXTENSION);
auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + index_granularity_info.marks_file_extension);

View File

@ -20,7 +20,8 @@ const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, siz
if (!marks)
loadMarks();
if (column_index >= columns_num)
throw Exception("", ErrorCodes::LOGICAL_ERROR); /// FIXME better exception
throw Exception("Column index: " + toString(column_index)
+ " is out of range (" + toString(columns_num) + ")", ErrorCodes::LOGICAL_ERROR);
return (*marks)[row_index * columns_num + column_index];
}

View File

@ -216,7 +216,7 @@ void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index)
{
MarkInCompressedFile mark = marks_loader.getMark(row_index, column_index);
// std::cerr << "(MergeTreeReaderCompact::seekToMark) mark: (" << mark.offset_in_compressed_file << ", " << mark.offset_in_decompressed_block << "\n";
std::cerr << "(MergeTreeReaderCompact::seekToMark) mark: (" << mark.offset_in_compressed_file << ", " << mark.offset_in_decompressed_block << "\n";
try
{

View File

@ -49,11 +49,11 @@ MergedBlockOutputStream::MergedBlockOutputStream(
if (aio_threshold > 0 && !merged_column_to_size.empty())
{
for (const auto & it : columns_list)
for (const auto & column : columns_list)
{
auto it2 = merged_column_to_size.find(it.name);
if (it2 != merged_column_to_size.end())
writer_settings.estimated_size += it2->second;
auto size_it = merged_column_to_size.find(column.name);
if (size_it != merged_column_to_size.end())
writer_settings.estimated_size += size_it->second;
}
}

View File

@ -12,11 +12,11 @@ namespace DB
{
/// A Storage that allows reading from a single MergeTree data part.
class StorageFroIMergeTreeDataPart : public ext::shared_ptr_helper<StorageFroIMergeTreeDataPart>, public IStorage
class StorageFromMergeTreeDataPart : public ext::shared_ptr_helper<StorageFromMergeTreeDataPart>, public IStorage
{
friend struct ext::shared_ptr_helper<StorageFroIMergeTreeDataPart>;
friend struct ext::shared_ptr_helper<StorageFromMergeTreeDataPart>;
public:
String getName() const override { return "FroIMergeTreeDataPart"; }
String getName() const override { return "FromMergeTreeDataPart"; }
String getTableName() const override { return part->storage.getTableName() + " (part " + part->name + ")"; }
String getDatabaseName() const override { return part->storage.getDatabaseName(); }
@ -40,7 +40,7 @@ public:
}
protected:
StorageFroIMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_)
StorageFromMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_)
: IStorage(part_->storage.getVirtuals()), part(part_)
{
setColumns(part_->storage.getColumns());

View File

@ -2,6 +2,7 @@
#include <Parsers/ASTAlterQuery.h>
#include <Storages/IStorage_fwd.h>
#include <Core/Names.h>
#include <optional>
#include <unordered_map>
@ -46,6 +47,8 @@ public:
void writeText(WriteBuffer & out) const;
void readText(ReadBuffer & in);
Names additional_columns;
};
}