First working test

This commit is contained in:
alesapin 2020-09-01 13:49:53 +03:00
parent b751319f9e
commit f0dc5a3085
11 changed files with 142 additions and 15 deletions

View File

@ -134,6 +134,7 @@ Block TTLBlockInputStream::readImpl()
removeValuesWithExpiredColumnTTL(block);
updateMovesTTL(block);
updateRecompressionTTL(block);
return block;
}
@ -395,6 +396,33 @@ void TTLBlockInputStream::updateMovesTTL(Block & block)
block.erase(column);
}
void TTLBlockInputStream::updateRecompressionTTL(Block & block)
{
std::vector<String> columns_to_remove;
for (const auto & ttl_entry : metadata_snapshot->getRecompressionTTLs())
{
auto & new_ttl_info = new_ttl_infos.recompression_ttl[ttl_entry.result_column];
if (!block.has(ttl_entry.result_column))
{
columns_to_remove.push_back(ttl_entry.result_column);
ttl_entry.expression->execute(block);
}
const IColumn * ttl_column = block.getByName(ttl_entry.result_column).column.get();
for (size_t i = 0; i < block.rows(); ++i)
{
UInt32 cur_ttl = getTimestampByIndex(ttl_column, i);
new_ttl_info.update(cur_ttl);
}
}
for (const String & column : columns_to_remove)
block.erase(column);
}
UInt32 TTLBlockInputStream::getTimestampByIndex(const IColumn * column, size_t ind)
{
if (const ColumnUInt16 * column_date = typeid_cast<const ColumnUInt16 *>(column))

View File

@ -78,6 +78,8 @@ private:
/// Updates TTL for moves
void updateMovesTTL(Block & block);
void updateRecompressionTTL(Block & block);
UInt32 getTimestampByIndex(const IColumn * column, size_t ind);
bool isTTLExpired(time_t ttl) const;
};

View File

@ -533,8 +533,16 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
/// Special step to recalculate affected indices and TTL expressions.
stages.emplace_back(context);
for (const auto & column : unchanged_columns)
{
std::cerr << "ADDING UNCHANGED COLUMN TO STAGE:" << column << std::endl;
stages.back().column_to_updated.emplace(
column, std::make_shared<ASTIdentifier>(column));
std::cerr << "OUTPUT COLUMNS:" << stages.back().output_columns.size() << std::endl;
for (const auto & col : stages.back().output_columns)
{
std::cerr << "OUTPUT COLUMN:" << col << std::endl;
}
}
}
}

View File

@ -3064,8 +3064,10 @@ CompressionCodecPtr MergeTreeData::getCompressionCodecForPart(size_t part_size_c
auto metadata_snapshot = getInMemoryMetadataPtr();
const auto & recompression_ttl_entries = metadata_snapshot->getRecompressionTTLs();
std::cerr << "RECOMPRESSION ENTRIES SIZE:" << recompression_ttl_entries.size() << std::endl;
for (auto ttl_entry_it = recompression_ttl_entries.begin(); ttl_entry_it != recompression_ttl_entries.end(); ++ttl_entry_it)
{
std::cerr << "RECOMPRESSION TTL SIZE:" << ttl_infos.recompression_ttl.size() << std::endl;
auto ttl_info_it = ttl_infos.recompression_ttl.find(ttl_entry_it->result_column);
/// Prefer TTL rule which went into action last.
if (ttl_info_it != ttl_infos.recompression_ttl.end()
@ -3078,7 +3080,15 @@ CompressionCodecPtr MergeTreeData::getCompressionCodecForPart(size_t part_size_c
}
if (max_max_ttl)
{
std::cerr << "BEST ENTRY FOUND, MAX MAX:" << max_max_ttl << std::endl;
std::cerr << "RECOMPRESSION IS NULLPTR:" << (best_entry_it->recompression_codec == nullptr) << std::endl;
return CompressionCodecFactory::instance().get(best_entry_it->recompression_codec, {});
}
else
{
std::cerr << "NOT FOUND NEW RECOMPRESSION\n";
}
return global_context.chooseCompressionCodec(
part_size_compressed,

View File

@ -659,9 +659,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
/// (which is locked in shared mode when input streams are created) and when inserting new data
/// the order is reverse. This annoys TSan even though one lock is locked in shared mode and thus
/// deadlock is impossible.
auto compression_codec = data.global_context.chooseCompressionCodec(
merge_entry->total_size_bytes_compressed,
static_cast<double> (merge_entry->total_size_bytes_compressed) / data.getTotalActiveSizeInBytes());
auto compression_codec = data.getCompressionCodecForPart(merge_entry->total_size_bytes_compressed, new_data_part->ttl_infos, time_of_merge);
/// TODO: Should it go through IDisk interface?
String rows_sources_file_path;
@ -1082,15 +1080,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
auto disk = new_data_part->volume->getDisk();
String new_part_tmp_path = new_data_part->getFullRelativePath();
/// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
/// (which is locked in data.getTotalActiveSizeInBytes())
/// (which is locked in shared mode when input streams are created) and when inserting new data
/// the order is reverse. This annoys TSan even though one lock is locked in shared mode and thus
/// deadlock is impossible.
auto compression_codec = context.chooseCompressionCodec(
source_part->getBytesOnDisk(),
static_cast<double>(source_part->getBytesOnDisk()) / data.getTotalActiveSizeInBytes());
disk->createDirectories(new_part_tmp_path);
/// Don't change granularity type while mutating subset of columns
@ -1100,11 +1089,27 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
bool need_remove_expired_values = false;
if (in && shouldExecuteTTL(metadata_snapshot, in->getHeader().getNamesAndTypesList().getNames(), commands_for_part))
{
std::cerr << "GOING TO MATERIALIZE TTL\n";
need_remove_expired_values = true;
}
else
{
std::cerr << "NOT GOING TO MATERIALIZE TTL\n";
std::cerr << "IN IS NULL:" << (in == nullptr) << std::endl;
}
/// All columns from part are changed and may be some more that were missing before in part
if (!isWidePart(source_part) || (interpreter && interpreter->isAffectingAllColumns()))
{
std::cerr << "MUTATING ALL PART COLUMNS\n";
/// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
/// (which is locked in data.getTotalActiveSizeInBytes())
/// (which is locked in shared mode when input streams are created) and when inserting new data
/// the order is reverse. This annoys TSan even though one lock is locked in shared mode and thus
/// deadlock is impossible.
auto compression_codec = data.getCompressionCodecForPart(source_part->getBytesOnDisk(), source_part->ttl_infos, time_of_mutation);
auto part_indices = getIndicesForNewDataPart(metadata_snapshot->getSecondaryIndices(), for_file_renames);
mutateAllPartColumns(
new_data_part,
@ -1121,6 +1126,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
}
else /// TODO: check that we modify only non-key columns in this case.
{
std::cerr << "MUTATING SOME PART COLUMNS\n";
/// We will modify only some of the columns. Other columns and key values can be copied as-is.
auto indices_to_recalc = getIndicesToRecalculate(in, updated_header.getNamesAndTypesList(), metadata_snapshot, context);
@ -1128,7 +1135,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
NameToNameVector files_to_rename = collectFilesForRenames(source_part, for_file_renames, mrk_extension);
if (need_remove_expired_values)
{
files_to_skip.insert("ttl.txt");
}
for (const auto & name : files_to_skip)
{
std::cerr << "SKIPPING " << name << std::endl;
}
/// Create hardlinks for unchanged files
for (auto it = disk->iterateDirectory(source_part->getFullRelativePath()); it->isValid(); it->next())
@ -1157,8 +1170,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
new_data_part->checksums = source_part->checksums;
auto compression_codec = source_part->default_codec;
if (in)
{
std::cerr << "HEADER:" << updated_header.dumpStructure() << std::endl;
std::cerr << "IN HEADER:" << in->getHeader().dumpStructure() << std::endl;
mutateSomePartColumns(
source_part,
metadata_snapshot,

View File

@ -13,6 +13,7 @@
#include <Poco/File.h>
#include <Common/typeid_cast.h>
#include <Parsers/queryToString.h>
namespace ProfileEvents
{
@ -234,8 +235,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
for (const auto & ttl_entry : move_ttl_entries)
updateTTL(ttl_entry, move_ttl_infos, move_ttl_infos.moves_ttl[ttl_entry.result_column], block, false);
time_t current_time = time(nullptr);
NamesAndTypesList columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
ReservationPtr reservation = data.reserveSpacePreferringTTLRules(expected_size, move_ttl_infos, time(nullptr));
ReservationPtr reservation = data.reserveSpacePreferringTTLRules(expected_size, move_ttl_infos, current_time);
VolumePtr volume = data.getStoragePolicy()->getVolume(0);
auto new_data_part = data.createPart(
@ -306,7 +308,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
/// This effectively chooses minimal compression method:
/// either default lz4 or compression method with zero thresholds on absolute and relative part size.
auto compression_codec = data.global_context.chooseCompressionCodec(0, 0);
auto compression_codec = data.getCompressionCodecForPart(0, new_data_part->ttl_infos, current_time);
std::cerr << "SELECTED CODEC:" << queryToString(compression_codec->getCodecDesc()) << std::endl;
const auto & index_factory = MergeTreeIndexFactory::instance();
MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, index_factory.getMany(metadata_snapshot->getSecondaryIndices()), compression_codec);

View File

@ -558,8 +558,11 @@ static StoragePtr create(const StorageFactory::Arguments & args)
metadata.sampling_key = KeyDescription::getKeyFromAST(args.storage_def->sample_by->ptr(), metadata.columns, args.context);
if (args.storage_def->ttl_table)
{
std::cerr << "Parsing table ttl in description\n";
metadata.table_ttl = TTLTableDescription::getTTLForTableFromAST(
args.storage_def->ttl_table->ptr(), metadata.columns, args.context, metadata.primary_key);
}
if (args.query.columns_list && args.query.columns_list->indices)
for (auto & index : args.query.columns_list->indices->children)

View File

@ -124,7 +124,7 @@ TTLTableDescription StorageInMemoryMetadata::getTableTTLs() const
bool StorageInMemoryMetadata::hasAnyTableTTL() const
{
return hasAnyMoveTTL() || hasRowsTTL();
return hasAnyMoveTTL() || hasRowsTTL() || hasAnyRecompressionTTL();
}
TTLColumnsDescription StorageInMemoryMetadata::getColumnTTLs() const
@ -207,6 +207,9 @@ ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(const NameSet
}
}
for (const auto & entry : getRecompressionTTLs())
add_dependent_columns(entry.expression, required_ttl_columns);
for (const auto & [name, entry] : getColumnTTLs())
{
if (add_dependent_columns(entry.expression, required_ttl_columns))

View File

@ -89,6 +89,7 @@ TTLDescription::TTLDescription(const TTLDescription & other)
, aggregate_descriptions(other.aggregate_descriptions)
, destination_type(other.destination_type)
, destination_name(other.destination_name)
, recompression_codec(other.recompression_codec)
{
if (other.expression)
expression = std::make_shared<ExpressionActions>(*other.expression);
@ -125,6 +126,12 @@ TTLDescription & TTLDescription::operator=(const TTLDescription & other)
aggregate_descriptions = other.aggregate_descriptions;
destination_type = other.destination_type;
destination_name = other.destination_name;
if (other.recompression_codec)
recompression_codec = other.recompression_codec->clone();
else
recompression_codec.reset();
return * this;
}
@ -266,6 +273,7 @@ TTLDescription TTLDescription::getTTLFromAST(
}
else if (ttl_element->mode == TTLMode::RECOMPRESS)
{
std::cerr << "GOT INTO RECOMPRESS\n";
result.recompression_codec =
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(
ttl_element->recompression_codec, {}, !context.getSettingsRef().allow_suspicious_codecs);
@ -283,6 +291,7 @@ TTLTableDescription::TTLTableDescription(const TTLTableDescription & other)
: definition_ast(other.definition_ast ? other.definition_ast->clone() : nullptr)
, rows_ttl(other.rows_ttl)
, move_ttl(other.move_ttl)
, recompression_ttl(other.recompression_ttl)
{
}
@ -298,6 +307,7 @@ TTLTableDescription & TTLTableDescription::operator=(const TTLTableDescription &
rows_ttl = other.rows_ttl;
move_ttl = other.move_ttl;
recompression_ttl = other.recompression_ttl;
return *this;
}
@ -327,6 +337,7 @@ TTLTableDescription TTLTableDescription::getTTLForTableFromAST(
}
else if (ttl.mode == TTLMode::RECOMPRESS)
{
std::cerr << "GOT RECOMPRESSIOn TTL\n";
result.recompression_ttl.emplace_back(std::move(ttl));
}
else

View File

@ -0,0 +1,10 @@
3000
1_1_1_0 LZ4
2_2_2_0 ZSTD(17)
3_3_3_0 LZ4HC(10)
1_1_1_0_4 LZ4
2_2_2_0_4 ZSTD(17)
3_3_3_0_4 LZ4HC(10)
1_1_1_1_4 LZ4
2_2_2_1_4 ZSTD(12)
3_3_3_1_4 ZSTD(12)

View File

@ -0,0 +1,32 @@
DROP TABLE IF EXISTS recompression_table;
CREATE TABLE recompression_table
(
dt DateTime,
key UInt64,
value String
) ENGINE MergeTree()
ORDER BY tuple()
PARTITION BY key
TTL dt + INTERVAL 1 MONTH RECOMPRESS CODEC(ZSTD(17)), dt + INTERVAL 1 YEAR RECOMPRESS CODEC(LZ4HC(10));
INSERT INTO recompression_table SELECT now(), 1, toString(number) from numbers(1000);
INSERT INTO recompression_table SELECT now() - INTERVAL 2 MONTH, 2, toString(number) from numbers(1000, 1000);
INSERT INTO recompression_table SELECT now() - INTERVAL 2 YEAR, 3, toString(number) from numbers(2000, 1000);
SELECT COUNT() FROM recompression_table;
SELECT name, default_compression_codec FROM system.parts WHERE table = 'recompression_table' and active = 1 and database = currentDatabase() ORDER BY name;
ALTER TABLE recompression_table MODIFY TTL dt + INTERVAL 1 MONTH RECOMPRESS CODEC(ZSTD(12)) SETTINGS mutations_sync = 2;
SELECT name, default_compression_codec FROM system.parts WHERE table = 'recompression_table' and active = 1 and database = currentDatabase() ORDER BY name;
OPTIMIZE TABLE recompression_table FINAL;
SELECT name, default_compression_codec FROM system.parts WHERE table = 'recompression_table' and active = 1 and database = currentDatabase() ORDER BY name;
DROP TABLE IF EXISTS recompression_table;