Add ability to choose codecs for storage log and tiny log

This commit is contained in:
alesapin 2019-01-21 17:00:06 +03:00
parent 5d3acdafe6
commit 34fb1c89f1
6 changed files with 165 additions and 5 deletions

View File

@ -240,6 +240,12 @@ CompressionCodecPtr ColumnsDescription::getCodecOrDefault(const String & column_
return codec->second;
}
CompressionCodecPtr ColumnsDescription::getCodecOrDefault(const String & column_name) const
{
return getCodecOrDefault(column_name, CompressionCodecFactory::instance().getDefaultCodec());
}
ColumnsDescription ColumnsDescription::parse(const String & str)
{
ReadBufferFromString buf{str};

View File

@ -69,6 +69,8 @@ struct ColumnsDescription
CompressionCodecPtr getCodecOrDefault(const String & column_name, CompressionCodecPtr default_codec) const;
CompressionCodecPtr getCodecOrDefault(const String & column_name) const;
static ColumnsDescription parse(const String & str);
static const ColumnsDescription * loadFromContext(const Context & context, const String & db, const String & table);

View File

@ -144,9 +144,9 @@ private:
struct Stream
{
Stream(const std::string & data_path, size_t max_compress_block_size) :
Stream(const std::string & data_path, CompressionCodecPtr codec, size_t max_compress_block_size) :
plain(data_path, max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY),
compressed(plain, CompressionCodecFactory::instance().getDefaultCodec(), max_compress_block_size)
compressed(plain, codec, max_compress_block_size)
{
plain_offset = Poco::File(data_path).getSize();
}
@ -355,7 +355,12 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
if (written_streams.count(stream_name))
return;
streams.try_emplace(stream_name, storage.files[stream_name].data_file.path(), storage.max_compress_block_size);
const auto & columns = storage.getColumns();
streams.try_emplace(
stream_name,
storage.files[stream_name].data_file.path(),
columns.getCodecOrDefault(name),
storage.max_compress_block_size);
}, settings.path);
settings.getter = createStreamGetter(name, written_streams);

View File

@ -135,9 +135,9 @@ private:
struct Stream
{
Stream(const std::string & data_path, size_t max_compress_block_size) :
Stream(const std::string & data_path, CompressionCodecPtr codec, size_t max_compress_block_size) :
plain(data_path, max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY),
compressed(plain, CompressionCodecFactory::instance().getDefaultCodec(), max_compress_block_size)
compressed(plain, codec, max_compress_block_size)
{
}
@ -237,6 +237,7 @@ void TinyLogBlockInputStream::readData(const String & name, const IDataType & ty
IDataType::OutputStreamGetter TinyLogBlockOutputStream::createStreamGetter(const String & name,
WrittenStreams & written_streams)
{
return [&] (const IDataType::SubstreamPath & path) -> WriteBuffer *
{
String stream_name = IDataType::getFileNameForStream(name, path);
@ -244,8 +245,10 @@ IDataType::OutputStreamGetter TinyLogBlockOutputStream::createStreamGetter(const
if (!written_streams.insert(stream_name).second)
return nullptr;
const auto & columns = storage.getColumns();
if (!streams.count(stream_name))
streams[stream_name] = std::make_unique<Stream>(storage.files[stream_name].data_file.path(),
columns.getCodecOrDefault(name),
storage.max_compress_block_size);
return &streams[stream_name]->compressed;

View File

@ -0,0 +1,26 @@
CREATE TABLE test.compression_codec_log ( id UInt64 CODEC(LZ4), data String CODEC(ZSTD(1)), ddd Date CODEC(NONE), somenum Float64 CODEC(ZSTD(2)), somestr FixedString(3) CODEC(LZ4HC(7)), othernum Int64 CODEC(Delta(8))) ENGINE = Log()
1 hello 2018-12-14 1.1 aaa 5
2 world 2018-12-15 2.2 bbb 6
3 ! 2018-12-16 3.3 ccc 7
2
CREATE TABLE test.compression_codec_multiple_log ( id UInt64 CODEC(LZ4, ZSTD(1), NONE, LZ4HC(0), Delta(4)), data String CODEC(ZSTD(2), NONE, Delta(2), LZ4HC(0), LZ4, LZ4, Delta(8)), ddd Date CODEC(NONE, NONE, NONE, Delta(1), LZ4, ZSTD(1), LZ4HC(0), LZ4HC(0)), somenum Float64 CODEC(Delta(4), LZ4, LZ4, ZSTD(2), LZ4HC(5), ZSTD(3), ZSTD(1))) ENGINE = Log()
1 world 2018-10-05 1.1
2 hello 2018-10-01 2.2
3 buy 2018-10-11 3.3
10003
10003
274972506.6
9175437371954010821
CREATE TABLE test.compression_codec_tiny_log ( id UInt64 CODEC(LZ4), data String CODEC(ZSTD(1)), ddd Date CODEC(NONE), somenum Float64 CODEC(ZSTD(2)), somestr FixedString(3) CODEC(LZ4HC(7)), othernum Int64 CODEC(Delta(8))) ENGINE = TinyLog()
1 hello 2018-12-14 1.1 aaa 5
2 world 2018-12-15 2.2 bbb 6
3 ! 2018-12-16 3.3 ccc 7
2
CREATE TABLE test.compression_codec_multiple_tiny_log ( id UInt64 CODEC(LZ4, ZSTD(1), NONE, LZ4HC(0), Delta(4)), data String CODEC(ZSTD(2), NONE, Delta(2), LZ4HC(0), LZ4, LZ4, Delta(8)), ddd Date CODEC(NONE, NONE, NONE, Delta(1), LZ4, ZSTD(1), LZ4HC(0), LZ4HC(0)), somenum Float64 CODEC(Delta(4), LZ4, LZ4, ZSTD(2), LZ4HC(5), ZSTD(3), ZSTD(1))) ENGINE = TinyLog()
1 world 2018-10-05 1.1
2 hello 2018-10-01 2.2
3 buy 2018-10-11 3.3
10003
10003
274972506.6
9175437371954010821

View File

@ -0,0 +1,118 @@
SET send_logs_level = 'none';
-- copy-paste for storage log
DROP TABLE IF EXISTS test.compression_codec_log;
CREATE TABLE test.compression_codec_log(
id UInt64 CODEC(LZ4),
data String CODEC(ZSTD),
ddd Date CODEC(NONE),
somenum Float64 CODEC(ZSTD(2)),
somestr FixedString(3) CODEC(LZ4HC(7)),
othernum Int64 CODEC(Delta)
) ENGINE = Log();
SHOW CREATE TABLE test.compression_codec_log;
INSERT INTO test.compression_codec_log VALUES(1, 'hello', toDate('2018-12-14'), 1.1, 'aaa', 5);
INSERT INTO test.compression_codec_log VALUES(2, 'world', toDate('2018-12-15'), 2.2, 'bbb', 6);
INSERT INTO test.compression_codec_log VALUES(3, '!', toDate('2018-12-16'), 3.3, 'ccc', 7);
SELECT * FROM test.compression_codec_log ORDER BY id;
INSERT INTO test.compression_codec_log VALUES(2, '', toDate('2018-12-13'), 4.4, 'ddd', 8);
DETACH TABLE test.compression_codec_log;
ATTACH TABLE test.compression_codec_log;
SELECT count(*) FROM test.compression_codec_log WHERE id = 2 GROUP BY id;
DROP TABLE IF EXISTS test.compression_codec_log;
DROP TABLE IF EXISTS test.compression_codec_multiple_log;
CREATE TABLE test.compression_codec_multiple_log (
id UInt64 CODEC(LZ4, ZSTD, NONE, LZ4HC, Delta(4)),
data String CODEC(ZSTD(2), NONE, Delta(2), LZ4HC, LZ4, LZ4, Delta(8)),
ddd Date CODEC(NONE, NONE, NONE, Delta(1), LZ4, ZSTD, LZ4HC, LZ4HC),
somenum Float64 CODEC(Delta(4), LZ4, LZ4, ZSTD(2), LZ4HC(5), ZSTD(3), ZSTD)
) ENGINE = Log();
SHOW CREATE TABLE test.compression_codec_multiple_log;
INSERT INTO test.compression_codec_multiple_log VALUES (1, 'world', toDate('2018-10-05'), 1.1), (2, 'hello', toDate('2018-10-01'), 2.2), (3, 'buy', toDate('2018-10-11'), 3.3);
SELECT * FROM test.compression_codec_multiple_log ORDER BY id;
INSERT INTO test.compression_codec_multiple_log select modulo(number, 100), toString(number), toDate('2018-12-01'), 5.5 * number FROM system.numbers limit 10000;
SELECT count(*) FROM test.compression_codec_multiple_log;
SELECT count(distinct data) FROM test.compression_codec_multiple_log;
SELECT floor(sum(somenum), 1) FROM test.compression_codec_multiple_log;
TRUNCATE TABLE test.compression_codec_multiple_log;
INSERT INTO test.compression_codec_multiple_log select modulo(number, 100), toString(number), toDate('2018-12-01'), 5.5 * number FROM system.numbers limit 10000;
SELECT sum(cityHash64(*)) FROM test.compression_codec_multiple_log;
-- copy-paste for storage tiny log
DROP TABLE IF EXISTS test.compression_codec_tiny_log;
CREATE TABLE test.compression_codec_tiny_log(
id UInt64 CODEC(LZ4),
data String CODEC(ZSTD),
ddd Date CODEC(NONE),
somenum Float64 CODEC(ZSTD(2)),
somestr FixedString(3) CODEC(LZ4HC(7)),
othernum Int64 CODEC(Delta)
) ENGINE = TinyLog();
SHOW CREATE TABLE test.compression_codec_tiny_log;
INSERT INTO test.compression_codec_tiny_log VALUES(1, 'hello', toDate('2018-12-14'), 1.1, 'aaa', 5);
INSERT INTO test.compression_codec_tiny_log VALUES(2, 'world', toDate('2018-12-15'), 2.2, 'bbb', 6);
INSERT INTO test.compression_codec_tiny_log VALUES(3, '!', toDate('2018-12-16'), 3.3, 'ccc', 7);
SELECT * FROM test.compression_codec_tiny_log ORDER BY id;
INSERT INTO test.compression_codec_tiny_log VALUES(2, '', toDate('2018-12-13'), 4.4, 'ddd', 8);
DETACH TABLE test.compression_codec_tiny_log;
ATTACH TABLE test.compression_codec_tiny_log;
SELECT count(*) FROM test.compression_codec_tiny_log WHERE id = 2 GROUP BY id;
DROP TABLE IF EXISTS test.compression_codec_tiny_log;
DROP TABLE IF EXISTS test.compression_codec_multiple_tiny_log;
CREATE TABLE test.compression_codec_multiple_tiny_log (
id UInt64 CODEC(LZ4, ZSTD, NONE, LZ4HC, Delta(4)),
data String CODEC(ZSTD(2), NONE, Delta(2), LZ4HC, LZ4, LZ4, Delta(8)),
ddd Date CODEC(NONE, NONE, NONE, Delta(1), LZ4, ZSTD, LZ4HC, LZ4HC),
somenum Float64 CODEC(Delta(4), LZ4, LZ4, ZSTD(2), LZ4HC(5), ZSTD(3), ZSTD)
) ENGINE = TinyLog();
SHOW CREATE TABLE test.compression_codec_multiple_tiny_log;
INSERT INTO test.compression_codec_multiple_tiny_log VALUES (1, 'world', toDate('2018-10-05'), 1.1), (2, 'hello', toDate('2018-10-01'), 2.2), (3, 'buy', toDate('2018-10-11'), 3.3);
SELECT * FROM test.compression_codec_multiple_tiny_log ORDER BY id;
INSERT INTO test.compression_codec_multiple_tiny_log select modulo(number, 100), toString(number), toDate('2018-12-01'), 5.5 * number FROM system.numbers limit 10000;
SELECT count(*) FROM test.compression_codec_multiple_tiny_log;
SELECT count(distinct data) FROM test.compression_codec_multiple_tiny_log;
SELECT floor(sum(somenum), 1) FROM test.compression_codec_multiple_tiny_log;
TRUNCATE TABLE test.compression_codec_multiple_tiny_log;
INSERT INTO test.compression_codec_multiple_tiny_log select modulo(number, 100), toString(number), toDate('2018-12-01'), 5.5 * number FROM system.numbers limit 10000;
SELECT sum(cityHash64(*)) FROM test.compression_codec_multiple_tiny_log;