Add codec and sync interval settings

This commit is contained in:
Andrew Onyshchuk 2020-01-11 01:01:20 -06:00 committed by oandrew
parent bfc610275d
commit 9da0df4f03
4 changed files with 34 additions and 8 deletions

View File

@ -198,6 +198,8 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, output_format_pretty_max_column_pad_width, 250, "Maximum width to pad all values in a column in Pretty formats.", 0) \
M(SettingBool, output_format_pretty_color, true, "Use ANSI escape sequences to paint colors in Pretty formats", 0) \
M(SettingUInt64, output_format_parquet_row_group_size, 1000000, "Row group size in rows.", 0) \
M(SettingString, output_format_avro_codec, "", "Compression codec used for output. Possible values: 'null', 'deflate', 'snappy'.", 0) \
M(SettingUInt64, output_format_avro_sync_interval, 16 * 1024, "Sync interval in bytes.", 0) \
\
M(SettingBool, use_client_time_zone, false, "Use client timezone for interpreting DateTime string values, instead of adopting server timezone.", 0) \
\

View File

@ -100,6 +100,8 @@ static FormatSettings getOutputFormatSetting(const Settings & settings, const Co
format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
format_settings.avro.output_codec = settings.output_format_avro_codec;
format_settings.avro.output_sync_interval = settings.output_format_avro_sync_interval;
return format_settings;
}

View File

@ -114,7 +114,11 @@ struct FormatSettings
struct Avro
{
String schema_registry_url;
} avro;
String output_codec;
UInt64 output_sync_interval = 16 * 1024;
};
Avro avro;
};

View File

@ -42,12 +42,6 @@
#include <avro/ValidSchema.hh>
#include <avro/Writer.hh>
#define DEFAULT_SYNC_INTERVAL 16*1024
#ifdef SNAPPY_CODEC_AVAILABLE
#define DEFAULT_CODEC avro::Codec::SNAPPY_CODEC
#else
#define DEFAULT_CODEC avro::Codec::DEFLATE_CODEC
#endif
namespace DB
{
@ -294,12 +288,36 @@ void AvroSerializer::serializeRow(const Columns & columns, size_t row_num, avro:
}
}
static avro::Codec getCodec(const std::string& codec_name)
{
if (codec_name == "")
{
#ifdef SNAPPY_CODEC_AVAILABLE
return avro::Codec::SNAPPY_CODEC;
#else
return avro::Codec::DEFLATE_CODEC;
#endif
}
if (codec_name == "null") return avro::Codec::NULL_CODEC;
if (codec_name == "deflate") return avro::Codec::DEFLATE_CODEC;
#ifdef SNAPPY_CODEC_AVAILABLE
if (codec_name == "snappy") return avro::Codec::SNAPPY_CODEC;
#endif
throw Exception("Avro codec " + codec_name + " is not available", ErrorCodes::BAD_ARGUMENTS);
}
AvroRowOutputFormat::AvroRowOutputFormat(
WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_, callback)
, settings(settings_)
, serializer(header_.getColumnsWithTypeAndName())
, file_writer(std::make_unique<OutputStreamWriteBufferAdapter>(out_), serializer.getSchema(), DEFAULT_SYNC_INTERVAL, DEFAULT_CODEC)
, file_writer(
std::make_unique<OutputStreamWriteBufferAdapter>(out_),
serializer.getSchema(),
settings.avro.output_sync_interval,
getCodec(settings.avro.output_codec))
{
}