DataType in enumerate streams

This commit is contained in:
alesapin 2020-09-18 14:37:58 +03:00
parent 748fb74de2
commit e96a3ac5f3
16 changed files with 61 additions and 30 deletions

View File

@ -30,6 +30,20 @@ CompressionCodecMultiple::CompressionCodecMultiple(Codecs codecs_)
setCodecDescription("", arguments);
}
CompressionCodecPtr CompressionCodecMultiple::filterNonGeneralCompressionCodecs(const CompressionCodecMultiple * codec)
{
Codecs filtered;
for (const auto & subcodec : codec->codecs)
if (!subcodec->isGenericCompression())
filtered.push_back(subcodec);
if (filtered.empty())
return nullptr;
return std::make_shared<CompressionCodecMultiple>(filtered);
}
uint8_t CompressionCodecMultiple::getMethodByte() const
{
return static_cast<uint8_t>(CompressionMethodByte::Multiple);

View File

@ -17,6 +17,8 @@ public:
static std::vector<uint8_t> getCodecsBytesFromData(const char * source);
static CompressionCodecPtr filterNonGeneralCompressionCodecs(const CompressionCodecMultiple * codec);
void updateHash(SipHash & hash) const override;
protected:

View File

@ -7,6 +7,7 @@
#include <Common/Exception.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTIdentifier.h>
#include <Compression/CompressionCodecMultiple.h>
namespace DB
@ -128,4 +129,15 @@ uint8_t ICompressionCodec::readMethod(const char * source)
return static_cast<uint8_t>(source[0]);
}
CompressionCodecPtr tryGetGeneralCompressionCodecs(const CompressionCodecPtr & codec)
{
if (codec->getMethodByte() == static_cast<uint8_t>(CompressionMethodByte::Multiple))
return CompressionCodecMultiple::filterNonGeneralCompressionCodecs(dynamic_cast<const CompressionCodecMultiple *>(codec.get()));
else if (!codec->isGenericCompression())
return nullptr;
else
return codec;
}
}

View File

@ -95,4 +95,6 @@ private:
ASTPtr full_codec_desc;
};
CompressionCodecPtr tryGetGeneralCompressionCodecs(const CompressionCodecPtr & codec);
}

View File

@ -151,7 +151,7 @@ namespace
void DataTypeArray::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
{
path.push_back(Substream::ArraySizes);
callback(path);
callback(path, *this);
path.back() = Substream::ArrayElements;
nested->enumerateStreams(callback, path);
path.pop_back();

View File

@ -54,7 +54,7 @@ void DataTypeLowCardinality::enumerateStreams(const StreamCallback & callback, S
path.push_back(Substream::DictionaryKeys);
dictionary_type->enumerateStreams(callback, path);
path.back() = Substream::DictionaryIndexes;
callback(path);
callback(path, *this);
path.pop_back();
}

View File

@ -44,7 +44,7 @@ bool DataTypeNullable::onlyNull() const
void DataTypeNullable::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
{
path.push_back(Substream::NullMap);
callback(path);
callback(path, *this);
path.back() = Substream::NullableElements;
nested_data_type->enumerateStreams(callback, path);
path.pop_back();

View File

@ -99,15 +99,17 @@ public:
/// Index of tuple element, starting at 1.
String tuple_element_name;
bool is_specialized_codecs_allowed = true;
Substream(Type type_) : type(type_) {}
};
using SubstreamPath = std::vector<Substream>;
using StreamCallback = std::function<void(const SubstreamPath &)>;
using StreamCallback = std::function<void(const SubstreamPath &, const IDataType & substream_type)>;
virtual void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
{
callback(path);
callback(path, *this);
}
void enumerateStreams(const StreamCallback & callback, SubstreamPath && path) const { enumerateStreams(callback, path); }
void enumerateStreams(const StreamCallback & callback) const { enumerateStreams(callback, {}); }
@ -685,4 +687,3 @@ template <> inline constexpr bool IsDataTypeDateOrDateTime<DataTypeDateTime> = t
template <> inline constexpr bool IsDataTypeDateOrDateTime<DataTypeDateTime64> = true;
}

View File

@ -51,7 +51,7 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
for (const NameAndTypePair & column : columns)
{
column.type->enumerateStreams(
[&](const IDataType::SubstreamPath & substream_path)
[&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_path */)
{
++stream_counts[IDataType::getFileNameForStream(column.name, substream_path)];
},
@ -62,7 +62,7 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
const String mrk_extension = data_part->getMarksFileExtension();
for (const auto & column_name : empty_columns)
{
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_path */)
{
String stream_name = IDataType::getFileNameForStream(column_name, substream_path);
/// Delete files if they are no longer shared with another column.

View File

@ -1449,7 +1449,7 @@ NameToNameVector MergeTreeDataMergerMutator::collectFilesForRenames(
for (const NameAndTypePair & column : source_part->getColumns())
{
column.type->enumerateStreams(
[&](const IDataType::SubstreamPath & substream_path)
[&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
++stream_counts[IDataType::getFileNameForStream(column.name, substream_path)];
},
@ -1467,7 +1467,7 @@ NameToNameVector MergeTreeDataMergerMutator::collectFilesForRenames(
}
else if (command.type == MutationCommand::Type::DROP_COLUMN)
{
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
String stream_name = IDataType::getFileNameForStream(command.column_name, substream_path);
/// Delete files if they are no longer shared with another column.
@ -1488,7 +1488,7 @@ NameToNameVector MergeTreeDataMergerMutator::collectFilesForRenames(
String escaped_name_from = escapeForFileName(command.column_name);
String escaped_name_to = escapeForFileName(command.rename_to);
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
String stream_from = IDataType::getFileNameForStream(command.column_name, substream_path);
@ -1521,7 +1521,7 @@ NameSet MergeTreeDataMergerMutator::collectFilesToSkip(
/// Skip updated files
for (const auto & entry : updated_header)
{
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
String stream_name = IDataType::getFileNameForStream(entry.name, substream_path);
files_to_skip.insert(stream_name + ".bin");

View File

@ -77,7 +77,7 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
if (checksums.empty())
return size;
type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
String file_name = IDataType::getFileNameForStream(column_name, substream_path);
@ -155,7 +155,7 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
for (const NameAndTypePair & name_type : columns)
{
IDataType::SubstreamPath stream_path;
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
String file_name = IDataType::getFileNameForStream(name_type.name, substream_path);
String mrk_file_name = file_name + index_granularity_info.marks_file_extension;
@ -177,7 +177,7 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
std::optional<UInt64> marks_size;
for (const NameAndTypePair & name_type : columns)
{
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
auto file_path = path + IDataType::getFileNameForStream(name_type.name, substream_path) + index_granularity_info.marks_file_extension;
@ -205,7 +205,7 @@ bool MergeTreeDataPartWide::hasColumnFiles(const String & column_name, const IDa
{
bool res = true;
type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
String file_name = IDataType::getFileNameForStream(column_name, substream_path);
@ -222,7 +222,7 @@ bool MergeTreeDataPartWide::hasColumnFiles(const String & column_name, const IDa
String MergeTreeDataPartWide::getFileNameForColumn(const NameAndTypePair & column) const
{
String filename;
column.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
column.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
if (filename.empty())
filename = IDataType::getFileNameForStream(column.name, substream_path);

View File

@ -37,7 +37,7 @@ void MergeTreeDataPartWriterWide::addStreams(
const CompressionCodecPtr & effective_codec,
size_t estimated_size)
{
IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path)
IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Shared offsets for Nested type.
@ -130,7 +130,7 @@ void MergeTreeDataPartWriterWide::writeSingleMark(
size_t number_of_rows,
DB::IDataType::SubstreamPath & path)
{
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
@ -170,7 +170,7 @@ size_t MergeTreeDataPartWriterWide::writeSingleGranule(
type.serializeBinaryBulkWithMultipleStreams(column, from_row, number_of_rows, serialize_settings, serialization_state);
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
@ -251,7 +251,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
current_column_mark++;
}
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets)
@ -312,7 +312,7 @@ void MergeTreeDataPartWriterWide::writeFinalMark(
{
writeSingleMark(column_name, *column_type, offset_columns, 0, path);
/// Memoize information about offsets
column_type->enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
column_type->enumerateStreams([&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets)

View File

@ -162,7 +162,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
void MergeTreeReaderWide::addStreams(const String & name, const IDataType & type,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
{
IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path)
IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
String stream_name = IDataType::getFileNameForStream(name, substream_path);

View File

@ -120,7 +120,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
{
for (const auto & column : columns_list)
{
column.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
column.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
String file_name = IDataType::getFileNameForStream(column.name, substream_path) + ".bin";
checksums_data.files[file_name] = checksum_compressed_file(disk, path + file_name);

View File

@ -360,7 +360,7 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
{
IDataType::SerializeBinaryBulkSettings settings;
type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
type.enumerateStreams([&] (const IDataType::SubstreamPath & path, const IDataType & /* substream_type */)
{
String stream_name = IDataType::getFileNameForStream(name, path);
if (written_streams.count(stream_name))
@ -380,7 +380,7 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
if (serialize_states.count(name) == 0)
type.serializeBinaryBulkStatePrefix(settings, serialize_states[name]);
type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
type.enumerateStreams([&] (const IDataType::SubstreamPath & path, const IDataType & /* substream_type */)
{
String stream_name = IDataType::getFileNameForStream(name, path);
if (written_streams.count(stream_name))
@ -398,7 +398,7 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
type.serializeBinaryBulkWithMultipleStreams(column, 0, 0, settings, serialize_states[name]);
type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
type.enumerateStreams([&] (const IDataType::SubstreamPath & path, const IDataType & /* substream_type */)
{
String stream_name = IDataType::getFileNameForStream(name, path);
if (!written_streams.emplace(stream_name).second)
@ -485,7 +485,7 @@ void StorageLog::addFiles(const String & column_name, const IDataType & type)
throw Exception("Duplicate column with name " + column_name + " in constructor of StorageLog.",
ErrorCodes::DUPLICATE_COLUMN);
IDataType::StreamCallback stream_callback = [&] (const IDataType::SubstreamPath & substream_path)
IDataType::StreamCallback stream_callback = [&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
String stream_name = IDataType::getFileNameForStream(column_name, substream_path);
@ -592,7 +592,7 @@ const StorageLog::Marks & StorageLog::getMarksWithRealRowCount(const StorageMeta
* (Example: for Array data type, first stream is array sizes; and number of array sizes is the number of arrays).
*/
IDataType::SubstreamPath substream_root_path;
column_type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
column_type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
if (filename.empty())
filename = IDataType::getFileNameForStream(column_name, substream_path);

View File

@ -389,7 +389,7 @@ void StorageTinyLog::addFiles(const String & column_name, const IDataType & type
throw Exception("Duplicate column with name " + column_name + " in constructor of StorageTinyLog.",
ErrorCodes::DUPLICATE_COLUMN);
IDataType::StreamCallback stream_callback = [&] (const IDataType::SubstreamPath & substream_path)
IDataType::StreamCallback stream_callback = [&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
String stream_name = IDataType::getFileNameForStream(column_name, substream_path);
if (!files.count(stream_name))