Almost working version

This commit is contained in:
alesapin 2020-09-21 17:22:13 +03:00
parent a3e999784b
commit 4debccb7fe
9 changed files with 56 additions and 40 deletions

View File

@ -30,20 +30,6 @@ CompressionCodecMultiple::CompressionCodecMultiple(Codecs codecs_)
setCodecDescription("", arguments);
}
CompressionCodecPtr CompressionCodecMultiple::filterNonGeneralCompressionCodecs(const CompressionCodecMultiple * codec)
{
Codecs filtered;
for (const auto & subcodec : codec->codecs)
if (!subcodec->isGenericCompression())
filtered.push_back(subcodec);
if (filtered.empty())
return nullptr;
return std::make_shared<CompressionCodecMultiple>(filtered);
}
uint8_t CompressionCodecMultiple::getMethodByte() const
{
return static_cast<uint8_t>(CompressionMethodByte::Multiple);

View File

@ -17,8 +17,6 @@ public:
static std::vector<uint8_t> getCodecsBytesFromData(const char * source);
static CompressionCodecPtr filterNonGeneralCompressionCodecs(const CompressionCodecMultiple * codec);
void updateHash(SipHash & hash) const override;
protected:

View File

@ -99,7 +99,25 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
}
else
{
result_codec = getImpl(codec_family_name, codec_arguments, column_type);
if (column_type)
{
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path, const IDataType & substream_type)
{
if (IDataType::isSpecialCompressionAllowed(substream_path))
result_codec = getImpl(codec_family_name, codec_arguments, &substream_type);
};
IDataType::SubstreamPath stream_path;
column_type->enumerateStreams(callback, stream_path);
if (!result_codec)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find any substream with data type for type {}. It's a bug", column_type->getName());
}
else
{
result_codec = getImpl(codec_family_name, codec_arguments, nullptr);
}
codecs_descriptions->children.emplace_back(result_codec->getCodecDesc());
}
@ -140,13 +158,16 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
" (Note: you can enable setting 'allow_suspicious_codecs' to skip this check).", ErrorCodes::BAD_ARGUMENTS);
}
return ast;
std::shared_ptr<ASTFunction> result = std::make_shared<ASTFunction>();
result->name = "CODEC";
result->arguments = codecs_descriptions;
return result;
}
throw Exception("Unknown codec family: " + queryToString(ast), ErrorCodes::UNKNOWN_CODEC);
}
CompressionCodecPtr CompressionCodecFactory::get(const ASTPtr & ast, const IDataType * column_type, CompressionCodecPtr current_default) const
CompressionCodecPtr CompressionCodecFactory::get(const ASTPtr & ast, const IDataType * column_type, CompressionCodecPtr current_default, bool only_generic) const
{
if (current_default == nullptr)
current_default = default_codec;
@ -172,10 +193,16 @@ CompressionCodecPtr CompressionCodecFactory::get(const ASTPtr & ast, const IData
else
throw Exception("Unexpected AST element for compression codec", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
CompressionCodecPtr codec;
if (codec_family_name == DEFAULT_CODEC_NAME)
codecs.emplace_back(current_default);
codec = current_default;
else
codecs.emplace_back(getImpl(codec_family_name, codec_arguments, column_type));
codec = getImpl(codec_family_name, codec_arguments, column_type);
if (only_generic && !codec->isGenericCompression())
continue;
codecs.emplace_back(codec);
}
CompressionCodecPtr res;
@ -184,6 +211,8 @@ CompressionCodecPtr CompressionCodecFactory::get(const ASTPtr & ast, const IData
return codecs.back();
else if (codecs.size() > 1)
return std::make_shared<CompressionCodecMultiple>(codecs);
else
return nullptr;
}
throw Exception("Unexpected AST structure for compression codec: " + queryToString(ast), ErrorCodes::UNEXPECTED_AST_STRUCTURE);

View File

@ -52,10 +52,10 @@ public:
/// be able to work without information about type. Also AST can contain
/// codec, which can be alias to current default codec, which can be changed
/// in runtime.
CompressionCodecPtr get(const ASTPtr & ast, const IDataType * column_type, CompressionCodecPtr current_default = nullptr) const;
CompressionCodecPtr get(const ASTPtr & ast, const DataTypePtr & column_type, CompressionCodecPtr current_default = nullptr) const
CompressionCodecPtr get(const ASTPtr & ast, const IDataType * column_type, CompressionCodecPtr current_default = nullptr, bool only_generic = false) const;
CompressionCodecPtr get(const ASTPtr & ast, const DataTypePtr & column_type, CompressionCodecPtr current_default = nullptr, bool only_generic = false) const
{
return get(ast, column_type.get(), current_default);
return get(ast, column_type.get(), current_default, only_generic);
}
/// Get codec by method byte (no params available)
@ -67,6 +67,7 @@ public:
/// Register codec with parameters and column type
void registerCompressionCodecWithType(const String & family_name, std::optional<uint8_t> byte_code, CreatorWithType creator);
/// Register codec with parameters
void registerCompressionCodec(const String & family_name, std::optional<uint8_t> byte_code, Creator creator);
/// Register codec without parameters

View File

@ -129,15 +129,4 @@ uint8_t ICompressionCodec::readMethod(const char * source)
return static_cast<uint8_t>(source[0]);
}
CompressionCodecPtr tryGetGeneralCompressionCodecs(const CompressionCodecPtr & codec)
{
if (codec->getMethodByte() == static_cast<uint8_t>(CompressionMethodByte::Multiple))
return CompressionCodecMultiple::filterNonGeneralCompressionCodecs(dynamic_cast<const CompressionCodecMultiple *>(codec.get()));
else if (!codec->isGenericCompression())
return nullptr;
else
return codec;
}
}

View File

@ -94,6 +94,4 @@ private:
ASTPtr full_codec_desc;
};
CompressionCodecPtr tryGetGeneralCompressionCodecs(const CompressionCodecPtr & codec);
}

View File

@ -130,7 +130,7 @@ String IDataType::getFileNameForStream(const String & column_name, const IDataTy
}
bool IDataType::isNonGenericCompressionAllowedForStream(const SubstreamPath & path)
bool IDataType::isSpecialCompressionAllowed(const SubstreamPath & path)
{
for (const Substream & elem : path)
{

View File

@ -442,7 +442,7 @@ public:
static String getFileNameForStream(const String & column_name, const SubstreamPath & path);
static bool isNonGenericCompressionAllowedForStream(const SubstreamPath & path);
static bool isSpecialCompressionAllowed(const SubstreamPath & path);
private:
friend class DataTypeFactory;
/// Customize this DataType

View File

@ -45,7 +45,22 @@ void MergeTreeDataPartWriterWide::addStreams(
if (column_streams.count(stream_name))
return;
auto compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, &substream_type, default_codec);
CompressionCodecPtr compression_codec;
if (IDataType::isSpecialCompressionAllowed(substream_path))
{
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, &substream_type, default_codec);
}
else
{
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, nullptr, default_codec, false);
}
if (compression_codec == nullptr)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"No generic compression provided for column {} with type {}",
backQuote(name),
type.getName());
column_streams[stream_name] = std::make_unique<Stream>(
stream_name,