This commit is contained in:
Evgeniy Gatov 2015-03-12 11:38:25 +03:00
commit fe43c43b6e
38 changed files with 4684 additions and 200 deletions

View File

@ -147,10 +147,10 @@ struct __attribute__((__packed__)) SingleValueDataString
static constexpr Int32 AUTOMATIC_STORAGE_SIZE = 64;
static constexpr Int32 MAX_SMALL_STRING_SIZE = AUTOMATIC_STORAGE_SIZE - sizeof(size);
union
union __attribute__((__aligned__(1)))
{
char small_data[MAX_SMALL_STRING_SIZE]; /// Включая завершающий ноль.
char * large_data;
char * __attribute__((__aligned__(1))) large_data;
};
~SingleValueDataString()

View File

@ -55,13 +55,15 @@ public:
Protocol::Compression::Enum compression_ = Protocol::Compression::Enable,
Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0),
Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0),
Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0))
Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0),
Poco::Timespan ping_timeout_ = Poco::Timespan(DBMS_DEFAULT_PING_TIMEOUT_SEC, 0))
:
host(host_), port(port_), default_database(default_database_),
user(user_), password(password_),
client_name(client_name_),
compression(compression_), data_type_factory(data_type_factory_),
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_),
ping_timeout(ping_timeout_),
log_wrapper(host, port)
{
/// Соединеняемся не сразу, а при первой необходимости.
@ -178,6 +180,7 @@ private:
Poco::Timespan connect_timeout;
Poco::Timespan receive_timeout;
Poco::Timespan send_timeout;
Poco::Timespan ping_timeout;
/// Откуда читать результат выполнения запроса.
SharedPtr<ReadBuffer> maybe_compressed_in;

View File

@ -18,10 +18,13 @@ public:
typedef PODArray<UInt8> Chars_t;
private:
/// Байты строк, уложенные подряд. Строки хранятся без завершающего нулевого байта.
/** NOTE Требуется, чтобы смещение и тип chars в объекте был таким же, как у data в ColumnVector<UInt8>.
* Это используется в функции packFixed (AggregationCommon.h)
*/
Chars_t chars;
/// Размер строк.
const size_t n;
/// Байты строк, уложенные подряд. Строки хранятся без завершающего нулевого байта.
Chars_t chars;
public:
/** Создать пустой столбец строк фиксированной длины n */

View File

@ -11,6 +11,7 @@
#define DBMS_DEFAULT_CONNECT_TIMEOUT_WITH_FAILOVER_MS 50
#define DBMS_DEFAULT_SEND_TIMEOUT_SEC 300
#define DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC 300
#define DBMS_DEFAULT_PING_TIMEOUT_SEC 5
#define DBMS_DEFAULT_POLL_INTERVAL 10
/// Насколько секунд можно максимально задерживать вставку в таблицу типа MergeTree, если в ней много недомердженных кусков.
@ -46,7 +47,9 @@
#define DEFAULT_INTERACTIVE_DELAY 100000
#define DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE 1024
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD 300 /// каждый период уменьшаем счетчик ошибок в 2 раза
/// каждый период уменьшаем счетчик ошибок в 2 раза
/// слишком маленький период может приводить, что ошибки исчезают сразу после создания.
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD (2*DBMS_DEFAULT_SEND_TIMEOUT_SEC)
#define DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS 5000 /// Максимальное время ожидания в очереди запросов.
#define DBMS_DEFAULT_BACKGROUND_POOL_SIZE 6

View File

@ -277,6 +277,8 @@ namespace ErrorCodes
STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS,
CPUID_ERROR,
INFINITE_LOOP,
CANNOT_COMPRESS,
CANNOT_DECOMPRESS,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -18,16 +18,6 @@ using Poco::SharedPtr;
class AggregatingBlockInputStream : public IProfilingBlockInputStream
{
public:
AggregatingBlockInputStream(BlockInputStreamPtr input_, const ColumnNumbers & keys_, AggregateDescriptions & aggregates_,
bool overflow_row_, bool final_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
Compiler * compiler_, UInt32 min_count_to_compile_, size_t group_by_two_level_threshold_)
: aggregator(keys_, aggregates_, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_,
compiler_, min_count_to_compile_, group_by_two_level_threshold_),
final(final_)
{
children.push_back(input_);
}
/** keys берутся из GROUP BY части запроса
* Агрегатные функции ищутся везде в выражении.
* Столбцы, соответствующие keys и аргументам агрегатных функций, уже должны быть вычислены.

View File

@ -15,10 +15,8 @@ class CollapsingFinalBlockInputStream : public IProfilingBlockInputStream
{
public:
CollapsingFinalBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_,
const String & sign_column_)
: description(description_), sign_column(sign_column_),
log(&Logger::get("CollapsingFinalBlockInputStream")),
first(true), count_positive(0), count_negative(0), count_incorrect_data(0), blocks_fetched(0), blocks_output(0)
const String & sign_column_name_)
: description(description_), sign_column_name(sign_column_name_)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
}
@ -40,7 +38,7 @@ public:
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ", sign_column, " << sign_column << ")";
res << ", sign_column, " << sign_column_name << ")";
return res.str();
}
@ -55,10 +53,10 @@ private:
{
MergingBlock(Block block_,
size_t stream_index_,
SortDescription desc_,
const SortDescription & desc,
String sign_column_name,
BlockPlainPtrs * output_blocks)
: block(block_), stream_index(stream_index_), desc(desc_), refcount(0), output_blocks(output_blocks)
: block(block_), stream_index(stream_index_), output_blocks(output_blocks)
{
sort_columns.resize(desc.size());
for (size_t i = 0; i < desc.size(); ++i)
@ -86,8 +84,6 @@ private:
/// Строки с одинаковым ключом будут упорядочены по возрастанию stream_index.
size_t stream_index;
SortDescription desc;
size_t rows;
/// Какие строки нужно оставить. Заполняется при слиянии потоков.
@ -98,7 +94,7 @@ private:
const ColumnInt8 * sign_column;
/// Когда достигает нуля, блок можно выдавать в ответ.
int refcount;
int refcount = 0;
/// Куда положить блок, когда он готов попасть в ответ.
BlockPlainPtrs * output_blocks;
@ -181,17 +177,17 @@ private:
Cursor() {}
explicit Cursor(MergingBlockPtr block_, size_t pos_ = 0) : block(block_), pos(pos_) {}
bool operator<(const Cursor & rhs) const
bool operator< (const Cursor & rhs) const
{
for (size_t i = 0; i < block->sort_columns.size(); ++i)
{
int direction = block->desc[i].direction;
int res = direction * block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), direction);
int res = block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), 1);
if (res > 0)
return true;
if (res < 0)
return false;
}
return block->stream_index > rhs.block->stream_index;
}
@ -203,7 +199,7 @@ private:
for (size_t i = 0; i < block->sort_columns.size(); ++i)
{
int res = block->desc[i].direction * block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), 1);
int res = block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), 1);
if (res != 0)
return false;
}
@ -235,12 +231,12 @@ private:
typedef std::priority_queue<Cursor> Queue;
SortDescription description;
String sign_column;
const SortDescription description;
String sign_column_name;
Logger * log;
Logger * log = &Logger::get("CollapsingFinalBlockInputStream");
bool first;
bool first = true;
BlockPlainPtrs output_blocks;
@ -249,15 +245,15 @@ private:
Cursor previous; /// Текущий первичный ключ.
Cursor last_positive; /// Последняя положительная строка для текущего первичного ключа.
size_t count_positive; /// Количество положительных строк для текущего первичного ключа.
size_t count_negative; /// Количество отрицательных строк для текущего первичного ключа.
bool last_is_positive; /// true, если последняя строка для текущего первичного ключа положительная.
size_t count_positive = 0; /// Количество положительных строк для текущего первичного ключа.
size_t count_negative = 0; /// Количество отрицательных строк для текущего первичного ключа.
bool last_is_positive = false; /// true, если последняя строка для текущего первичного ключа положительная.
size_t count_incorrect_data; /// Чтобы не писать в лог слишком много сообщений об ошибке.
size_t count_incorrect_data = 0; /// Чтобы не писать в лог слишком много сообщений об ошибке.
/// Посчитаем, сколько блоков получили на вход и отдали на выход.
size_t blocks_fetched;
size_t blocks_output;
size_t blocks_fetched = 0;
size_t blocks_output = 0;
void fetchNextBlock(size_t input_index);
void commitCurrent();

View File

@ -16,14 +16,6 @@ using Poco::SharedPtr;
class MergingAggregatedBlockInputStream : public IProfilingBlockInputStream
{
public:
MergingAggregatedBlockInputStream(BlockInputStreamPtr input_, const ColumnNumbers & keys_,
const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_, size_t max_threads_)
: aggregator(keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0),
final(final_), max_threads(max_threads_)
{
children.push_back(input_);
}
MergingAggregatedBlockInputStream(BlockInputStreamPtr input_, const Names & keys_names_,
const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_, size_t max_threads_)
: aggregator(keys_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0),

View File

@ -19,19 +19,6 @@ using Poco::SharedPtr;
class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
{
public:
ParallelAggregatingBlockInputStream(BlockInputStreams inputs, const ColumnNumbers & keys_,
AggregateDescriptions & aggregates_, bool overflow_row_, bool final_, size_t max_threads_,
size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
Compiler * compiler_, UInt32 min_count_to_compile_, size_t group_by_two_level_threshold_)
: aggregator(keys_, aggregates_, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_,
compiler_, min_count_to_compile_, group_by_two_level_threshold_),
final(final_), max_threads(std::min(inputs.size(), max_threads_)),
keys_size(keys_.size()), aggregates_size(aggregates_.size()),
handler(*this), processor(inputs, max_threads, handler)
{
children.insert(children.end(), inputs.begin(), inputs.end());
}
/** Столбцы из key_names и аргументы агрегатных функций, уже должны быть вычислены.
*/
ParallelAggregatingBlockInputStream(BlockInputStreams inputs, const Names & key_names,
@ -41,7 +28,7 @@ public:
: aggregator(key_names, aggregates, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_,
compiler_, min_count_to_compile_, group_by_two_level_threshold_),
final(final_), max_threads(std::min(inputs.size(), max_threads_)),
keys_size(key_names.size()), aggregates_size(aggregates.size()),
keys_size(aggregator.getNumberOfKeys()), aggregates_size(aggregator.getNumberOfAggregates()),
handler(*this), processor(inputs, max_threads, handler)
{
children.insert(children.end(), inputs.begin(), inputs.end());

View File

@ -53,12 +53,13 @@ private:
owned_cell.reset(new UncompressedCacheCell);
size_t size_decompressed;
owned_cell->compressed_size = readCompressedData(size_decompressed);
size_t size_compressed_without_checksum;
owned_cell->compressed_size = readCompressedData(size_decompressed, size_compressed_without_checksum);
if (owned_cell->compressed_size)
{
owned_cell->data.resize(size_decompressed);
decompress(owned_cell->data.m_data, size_decompressed);
decompress(owned_cell->data.m_data, size_decompressed, size_compressed_without_checksum);
/// Положим данные в кэш.
cache->set(key, owned_cell);

View File

@ -14,14 +14,15 @@ private:
bool nextImpl()
{
size_t size_decompressed;
size_compressed = readCompressedData(size_decompressed);
size_t size_compressed_without_checksum;
size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum);
if (!size_compressed)
return false;
memory.resize(size_decompressed);
working_buffer = Buffer(&memory[0], &memory[size_decompressed]);
decompress(working_buffer.begin(), size_decompressed);
decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum);
return true;
}
@ -44,14 +45,15 @@ public:
while (bytes_read < n)
{
size_t size_decompressed;
size_t size_compressed_without_checksum;
if (!readCompressedData(size_decompressed))
if (!readCompressedData(size_decompressed, size_compressed_without_checksum))
return bytes_read;
/// Если разжатый блок помещается целиком туда, куда его надо скопировать.
if (size_decompressed <= n - bytes_read)
{
decompress(to + bytes_read, size_decompressed);
decompress(to + bytes_read, size_decompressed, size_compressed_without_checksum);
bytes_read += size_decompressed;
bytes += size_decompressed;
}
@ -62,7 +64,7 @@ public:
working_buffer = Buffer(&memory[0], &memory[size_decompressed]);
pos = working_buffer.begin();
decompress(working_buffer.begin(), size_decompressed);
decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum);
bytes_read += read(to + bytes_read, n - bytes_read);
break;

View File

@ -5,6 +5,7 @@
#include <city.h>
#include <quicklz/quicklz_level1.h>
#include <lz4/lz4.h>
#include <zstd/zstd.h>
#include <DB/Common/PODArray.h>
#include <DB/Common/ProfileEvents.h>
@ -32,7 +33,7 @@ protected:
/// Прочитать сжатые данные в compressed_buffer. Достать из их заголовка размер разжатых данных. Проверить чексумму.
/// Возвращает количество прочитанных байт.
size_t readCompressedData(size_t & size_decompressed)
size_t readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum)
{
if (compressed_in->eof())
return 0;
@ -44,14 +45,15 @@ protected:
compressed_in->readStrict(&own_compressed_buffer[0], QUICKLZ_HEADER_SIZE);
UInt8 method = own_compressed_buffer[0]; /// См. CompressedWriteBuffer.h
size_t size_compressed;
size_t & size_compressed = size_compressed_without_checksum;
if (method < 0x80)
{
size_compressed = qlz_size_compressed(&own_compressed_buffer[0]);
size_decompressed = qlz_size_decompressed(&own_compressed_buffer[0]);
}
else if (method == 0x82)
else if (method == static_cast<UInt8>(CompressionMethodByte::LZ4) || method == static_cast<UInt8>(CompressionMethodByte::ZSTD))
{
size_compressed = *reinterpret_cast<const UInt32 *>(&own_compressed_buffer[1]);
size_decompressed = *reinterpret_cast<const UInt32 *>(&own_compressed_buffer[5]);
@ -85,7 +87,7 @@ protected:
return size_compressed + sizeof(checksum);
}
void decompress(char * to, size_t size_decompressed)
void decompress(char * to, size_t size_decompressed, size_t size_compressed_without_checksum)
{
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks);
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed);
@ -99,10 +101,19 @@ protected:
qlz_decompress(&compressed_buffer[0], to, qlz_state);
}
else if (method == 0x82)
else if (method == static_cast<UInt8>(CompressionMethodByte::LZ4))
{
if (LZ4_decompress_fast(&compressed_buffer[QUICKLZ_HEADER_SIZE], to, size_decompressed) < 0)
throw Exception("Cannot LZ4_decompress_fast", ErrorCodes::CORRUPTED_DATA);
throw Exception("Cannot LZ4_decompress_fast", ErrorCodes::CANNOT_DECOMPRESS);
}
else if (method == static_cast<UInt8>(CompressionMethodByte::ZSTD))
{
size_t res = ZSTD_decompress(
to, size_decompressed,
&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed_without_checksum - QUICKLZ_HEADER_SIZE);
if (ZSTD_isError(res))
throw Exception("Cannot ZSTD_decompress: " + std::string(ZSTD_getErrorName(res)), ErrorCodes::CANNOT_DECOMPRESS);
}
else
throw Exception("Unknown compression method: " + toString(method), ErrorCodes::UNKNOWN_COMPRESSION_METHOD);

View File

@ -24,14 +24,15 @@ private:
bool nextImpl()
{
size_t size_decompressed;
size_compressed = readCompressedData(size_decompressed);
size_t size_compressed_without_checksum;
size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum);
if (!size_compressed)
return false;
memory.resize(size_decompressed);
working_buffer = Buffer(&memory[0], &memory[size_decompressed]);
decompress(working_buffer.begin(), size_decompressed);
decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum);
return true;
}
@ -81,8 +82,9 @@ public:
while (bytes_read < n)
{
size_t size_decompressed = 0;
size_t size_compressed_without_checksum = 0;
size_t new_size_compressed = readCompressedData(size_decompressed);
size_t new_size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum);
size_compressed = 0; /// file_in больше не указывает на конец блока в working_buffer.
if (!new_size_compressed)
return bytes_read;
@ -90,7 +92,7 @@ public:
/// Если разжатый блок помещается целиком туда, куда его надо скопировать.
if (size_decompressed <= n - bytes_read)
{
decompress(to + bytes_read, size_decompressed);
decompress(to + bytes_read, size_decompressed, size_compressed_without_checksum);
bytes_read += size_decompressed;
bytes += size_decompressed;
}
@ -102,7 +104,7 @@ public:
working_buffer = Buffer(&memory[0], &memory[size_decompressed]);
pos = working_buffer.begin();
decompress(working_buffer.begin(), size_decompressed);
decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum);
bytes_read += read(to + bytes_read, n - bytes_read);
break;

View File

@ -11,15 +11,48 @@
namespace DB
{
namespace CompressionMethod
/** Метод сжатия */
enum class CompressionMethod
{
/** Метод сжатия */
enum Enum
{
QuickLZ,
LZ4,
LZ4HC, /// Формат такой же, как у LZ4. Разница только при сжатии.
};
}
QuickLZ,
LZ4,
LZ4HC, /// Формат такой же, как у LZ4. Разница только при сжатии.
ZSTD, /// Экспериментальный алгоритм: https://github.com/Cyan4973/zstd
};
/** Формат сжатого блока следующий:
*
* Первые 16 байт - чексумма от всех остальных байт блока. Сейчас используется только CityHash128.
* В дальнейшем можно предусмотреть другие чексуммы, хотя сделать их другого размера не получится.
*
* Следующий байт определяет алгоритм сжатия. Далее всё зависит от алгоритма.
*
* Первые 4 варианта совместимы с QuickLZ level 1.
* То есть, если значение первого байта < 4, для разжатия достаточно использовать функцию qlz_level1_decompress.
*
* 0x00 - несжатые данные, маленький блок. Далее один байт - размер сжатых данных, с учётом заголовка; один байт - размер несжатых данных.
* 0x01 - сжатые данные, QuickLZ level 1, маленький блок. Далее два байта аналогично.
* 0x02 - несжатые данные, большой блок. Далее 4 байта - размер сжатых данных, с учётом заголовка; 4 байта - размер несжатых данных.
* 0x03 - сжатые данные, QuickLZ level 1, большой блок. Далее 8 байт аналогично.
*
* 0x82 - LZ4 или LZ4HC (они имеют одинаковый формат).
* Далее 4 байта - размер сжатых данных, с учётом заголовка; 4 байта - размер несжатых данных.
*
* NOTE: Почему 0x82?
* Изначально использовался только QuickLZ. Потом был добавлен LZ4.
* Старший бит выставлен, чтобы отличить от QuickLZ, а второй бит выставлен для совместимости,
* чтобы работали функции qlz_size_compressed, qlz_size_decompressed.
* Хотя сейчас такая совместимость уже не актуальна.
*
* 0x90 - ZSTD
*
* Все размеры - little endian.
*/
enum class CompressionMethodByte : uint8_t
{
LZ4 = 0x82,
ZSTD = 0x90,
};
}

View File

@ -8,6 +8,7 @@
#include <quicklz/quicklz_level1.h>
#include <lz4/lz4.h>
#include <lz4/lz4hc.h>
#include <zstd/zstd.h>
#include <DB/Common/PODArray.h>
#include <DB/Core/Types.h>
@ -24,7 +25,7 @@ class CompressedWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
{
private:
WriteBuffer & out;
CompressionMethod::Enum method;
CompressionMethod method;
PODArray<char> compressed_buffer;
qlz_state_compress * qlz_state;
@ -38,31 +39,7 @@ private:
size_t compressed_size = 0;
char * compressed_buffer_ptr = nullptr;
/** Формат сжатого блока следующий:
*
* Первые 16 байт - чексумма от всех остальных байт блока. Сейчас используется только CityHash128.
* В дальнейшем можно предусмотреть другие чексуммы, хотя сделать их другого размера не получится.
*
* Следующий байт определяет алгоритм сжатия. Далее всё зависит от алгоритма.
*
* Первые 4 варианта совместимы с QuickLZ level 1.
* То есть, если значение первого байта < 4, для разжатия достаточно использовать функцию qlz_level1_decompress.
*
* 0x00 - несжатые данные, маленький блок. Далее один байт - размер сжатых данных, с учётом заголовка; один байт - размер несжатых данных.
* 0x01 - сжатые данные, QuickLZ level 1, маленький блок. Далее два байта аналогично.
* 0x02 - несжатые данные, большой блок. Далее 4 байта - размер сжатых данных, с учётом заголовка; 4 байта - размер несжатых данных.
* 0x03 - сжатые данные, QuickLZ level 1, большой блок. Далее 8 байт аналогично.
*
* 0x82 - LZ4 или LZ4HC (они имеют одинаковый формат).
* Далее 4 байта - размер сжатых данных, с учётом заголовка; 4 байта - размер несжатых данных.
*
* NOTE: Почему 0x82?
* Изначально использовался только QuickLZ. Потом был добавлен LZ4.
* Старший бит выставлен, чтобы отличить от QuickLZ, а второй бит выставлен для совместимости,
* чтобы работали функции qlz_size_compressed, qlz_size_decompressed.
* Хотя сейчас такая совместимость уже не актуальна.
*
* Все размеры - little endian.
/** Формат сжатого блока - см. CompressedStream.h
*/
switch (method)
@ -88,7 +65,7 @@ private:
compressed_buffer.resize(header_size + LZ4_COMPRESSBOUND(uncompressed_size));
compressed_buffer[0] = 0x82; /// Второй бит - для совместимости с QuickLZ - обозначает, что размеры записываются 4 байтами.
compressed_buffer[0] = static_cast<UInt8>(CompressionMethodByte::LZ4);
if (method == CompressionMethod::LZ4)
compressed_size = header_size + LZ4_compress(
@ -110,6 +87,34 @@ private:
compressed_buffer_ptr = &compressed_buffer[0];
break;
}
case CompressionMethod::ZSTD:
{
static constexpr size_t header_size = 1 + sizeof(UInt32) + sizeof(UInt32);
compressed_buffer.resize(header_size + ZSTD_compressBound(uncompressed_size));
compressed_buffer[0] = static_cast<UInt8>(CompressionMethodByte::ZSTD);
size_t res = ZSTD_compress(
&compressed_buffer[header_size],
compressed_buffer.size(),
working_buffer.begin(),
uncompressed_size);
if (ZSTD_isError(res))
throw Exception("Cannot compress block with ZSTD: " + std::string(ZSTD_getErrorName(res)), ErrorCodes::CANNOT_COMPRESS);
compressed_size = header_size + res;
UInt32 compressed_size_32 = compressed_size;
UInt32 uncompressed_size_32 = uncompressed_size;
memcpy(&compressed_buffer[1], reinterpret_cast<const char *>(&compressed_size_32), sizeof(compressed_size_32));
memcpy(&compressed_buffer[5], reinterpret_cast<const char *>(&uncompressed_size_32), sizeof(uncompressed_size_32));
compressed_buffer_ptr = &compressed_buffer[0];
break;
}
default:
throw Exception("Unknown compression method", ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
}
@ -123,7 +128,7 @@ private:
public:
CompressedWriteBuffer(
WriteBuffer & out_,
CompressionMethod::Enum method_ = CompressionMethod::LZ4,
CompressionMethod method_ = CompressionMethod::LZ4,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
: BufferWithOwnMemory<WriteBuffer>(buf_size), out(out_), method(method_), qlz_state(new qlz_state_compress) {}

View File

@ -676,19 +676,6 @@ APPLY_FOR_AGGREGATED_VARIANTS(M)
class Aggregator
{
public:
Aggregator(const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_,
size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_, Compiler * compiler_, UInt32 min_count_to_compile_,
size_t group_by_two_level_threshold_)
: keys(keys_), aggregates(aggregates_), aggregates_size(aggregates.size()),
overflow_row(overflow_row_),
max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
compiler(compiler_), min_count_to_compile(min_count_to_compile_), group_by_two_level_threshold(group_by_two_level_threshold_)
{
std::sort(keys.begin(), keys.end());
keys.erase(std::unique(keys.begin(), keys.end()), keys.end());
keys_size = keys.size();
}
Aggregator(const Names & key_names_, const AggregateDescriptions & aggregates_, bool overflow_row_,
size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_, Compiler * compiler_, UInt32 min_count_to_compile_,
size_t group_by_two_level_threshold_)
@ -739,6 +726,9 @@ public:
/// Для IBlockInputStream.
String getID() const;
size_t getNumberOfKeys() const { return keys_size; }
size_t getNumberOfAggregates() const { return aggregates_size; }
protected:
friend struct AggregatedDataVariants;

View File

@ -218,6 +218,9 @@ private:
/// Eliminates injective function calls and constant expressions from group by statement
void optimizeGroupBy();
/// Удалить из ORDER BY повторяющиеся элементы.
void optimizeOrderBy();
/// Превратить перечисление значений или подзапрос в ASTSet. node - функция in или notIn.
void makeSet(ASTFunction * node, const Block & sample_block);

View File

@ -162,11 +162,37 @@ void Connection::forceConnected()
}
}
struct PingTimeoutSetter
{
PingTimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & ping_timeout_)
: socket(socket_), ping_timeout(ping_timeout_)
{
old_send_timeout = socket.getSendTimeout();
old_receive_timeout = socket.getReceiveTimeout();
if (old_send_timeout > ping_timeout)
socket.setSendTimeout(ping_timeout);
if (old_receive_timeout > ping_timeout)
socket.setReceiveTimeout(ping_timeout);
}
~PingTimeoutSetter()
{
socket.setSendTimeout(old_send_timeout);
socket.setReceiveTimeout(old_receive_timeout);
}
Poco::Net::StreamSocket & socket;
Poco::Timespan ping_timeout;
Poco::Timespan old_send_timeout;
Poco::Timespan old_receive_timeout;
};
bool Connection::ping()
{
// LOG_TRACE(log_wrapper.get(), "Ping (" << getServerAddress() << ")");
PingTimeoutSetter timeout_setter(socket, ping_timeout);
try
{
UInt64 pong = 0;

View File

@ -12,14 +12,14 @@ CollapsingFinalBlockInputStream::~CollapsingFinalBlockInputStream()
/// Нужно обезвредить все MergingBlockPtr, чтобы они не пытались класть блоки в output_blocks.
previous.block.cancel();
last_positive.block.cancel();
while (!queue.empty())
{
Cursor c = queue.top();
queue.pop();
c.block.cancel();
}
for (size_t i = 0; i < output_blocks.size(); ++i)
delete output_blocks[i];
}
@ -43,7 +43,7 @@ void CollapsingFinalBlockInputStream::fetchNextBlock(size_t input_index)
Block block = stream->read();
if (!block)
return;
MergingBlockPtr merging_block(new MergingBlock(block, input_index, description, sign_column, &output_blocks));
MergingBlockPtr merging_block(new MergingBlock(block, input_index, description, sign_column_name, &output_blocks));
++blocks_fetched;
queue.push(Cursor(merging_block));
}
@ -56,18 +56,18 @@ void CollapsingFinalBlockInputStream::commitCurrent()
{
last_positive.addToFilter();
}
if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1))
{
if (count_incorrect_data < MAX_ERROR_MESSAGES)
reportBadCounts();
++count_incorrect_data;
}
last_positive = Cursor();
previous = Cursor();
}
count_negative = 0;
count_positive = 0;
}
@ -81,7 +81,7 @@ Block CollapsingFinalBlockInputStream::readImpl()
first = false;
}
/// Будем формировать блоки для ответа, пока не получится непустой блок.
while (true)
{
@ -89,10 +89,10 @@ Block CollapsingFinalBlockInputStream::readImpl()
{
Cursor current = queue.top();
queue.pop();
bool has_next = !queue.empty();
Cursor next = has_next ? queue.top() : Cursor();
/// Будем продвигаться в текущем блоке, не используя очередь, пока возможно.
while (true)
{
@ -101,7 +101,7 @@ Block CollapsingFinalBlockInputStream::readImpl()
commitCurrent();
previous = current;
}
Int8 sign = current.getSign();
if (sign == 1)
{
@ -116,53 +116,50 @@ Block CollapsingFinalBlockInputStream::readImpl()
}
else
reportBadSign(sign);
if (current.isLast())
{
fetchNextBlock(current.block->stream_index);
/// Все потоки кончились. Обработаем последний ключ.
if (!has_next)
{
commitCurrent();
}
break;
}
else
{
current.next();
if (has_next && !(next < current))
{
queue.push(current);
break;
}
}
}
}
/// Конец потока.
if (output_blocks.empty())
{
if (blocks_fetched != blocks_output)
LOG_ERROR(log, "Logical error: CollapsingFinalBlockInputStream has output " << blocks_output << " blocks instead of " << blocks_fetched);
return Block();
}
MergingBlock * merging_block = output_blocks.back();
Block block = merging_block->block;
for (size_t i = 0; i < block.columns(); ++i)
block.getByPosition(i).column = block.getByPosition(i).column->filter(merging_block->filter);
output_blocks.pop_back();
delete merging_block;
++blocks_output;
if (block)
return block;
}

View File

@ -23,16 +23,18 @@
int main(int argc, char ** argv)
{
using namespace DB;
try
{
size_t n = argc == 2 ? atoi(argv[1]) : 10;
DB::Block block;
Block block;
DB::ColumnWithNameAndType column_x;
ColumnWithNameAndType column_x;
column_x.name = "x";
column_x.type = new DB::DataTypeInt16;
DB::ColumnInt16 * x = new DB::ColumnInt16;
column_x.type = new DataTypeInt16;
ColumnInt16 * x = new ColumnInt16;
column_x.column = x;
auto & vec_x = x->getData();
@ -44,65 +46,64 @@ int main(int argc, char ** argv)
const char * strings[] = {"abc", "def", "abcd", "defg", "ac"};
DB::ColumnWithNameAndType column_s1;
ColumnWithNameAndType column_s1;
column_s1.name = "s1";
column_s1.type = new DB::DataTypeString;
column_s1.column = new DB::ColumnString;
column_s1.type = new DataTypeString;
column_s1.column = new ColumnString;
for (size_t i = 0; i < n; ++i)
column_s1.column->insert(std::string(strings[i % 5]));
block.insert(column_s1);
DB::ColumnWithNameAndType column_s2;
ColumnWithNameAndType column_s2;
column_s2.name = "s2";
column_s2.type = new DB::DataTypeString;
column_s2.column = new DB::ColumnString;
column_s2.type = new DataTypeString;
column_s2.column = new ColumnString;
for (size_t i = 0; i < n; ++i)
column_s2.column->insert(std::string(strings[i % 3]));
block.insert(column_s2);
DB::ColumnNumbers key_column_numbers;
key_column_numbers.push_back(0);
//key_column_numbers.push_back(1);
Names key_column_names;
key_column_names.emplace_back("x");
DB::AggregateFunctionFactory factory;
AggregateFunctionFactory factory;
DB::AggregateDescriptions aggregate_descriptions(1);
AggregateDescriptions aggregate_descriptions(1);
DB::DataTypes empty_list_of_types;
DataTypes empty_list_of_types;
aggregate_descriptions[0].function = factory.get("count", empty_list_of_types);
Poco::SharedPtr<DB::DataTypes> result_types = new DB::DataTypes
Poco::SharedPtr<DataTypes> result_types = new DataTypes
{
new DB::DataTypeInt16,
// new DB::DataTypeString,
new DB::DataTypeUInt64,
new DataTypeInt16,
// new DataTypeString,
new DataTypeUInt64,
};
DB::Block sample;
for (DB::DataTypes::const_iterator it = result_types->begin(); it != result_types->end(); ++it)
Block sample;
for (DataTypes::const_iterator it = result_types->begin(); it != result_types->end(); ++it)
{
DB::ColumnWithNameAndType col;
ColumnWithNameAndType col;
col.type = *it;
sample.insert(col);
}
DB::BlockInputStreamPtr stream = new DB::OneBlockInputStream(block);
stream = new DB::AggregatingBlockInputStream(stream, key_column_numbers, aggregate_descriptions, false, true,
0, DB::OverflowMode::THROW, nullptr, 0, 0);
BlockInputStreamPtr stream = new OneBlockInputStream(block);
stream = new AggregatingBlockInputStream(stream, key_column_names, aggregate_descriptions, false, true,
0, OverflowMode::THROW, nullptr, 0, 0);
DB::WriteBufferFromOStream ob(std::cout);
DB::RowOutputStreamPtr row_out = new DB::TabSeparatedRowOutputStream(ob, sample);
DB::BlockOutputStreamPtr out = new DB::BlockOutputStreamFromRowOutputStream(row_out);
WriteBufferFromOStream ob(std::cout);
RowOutputStreamPtr row_out = new TabSeparatedRowOutputStream(ob, sample);
BlockOutputStreamPtr out = new BlockOutputStreamFromRowOutputStream(row_out);
{
Poco::Stopwatch stopwatch;
stopwatch.start();
DB::copyData(*stream, *out);
copyData(*stream, *out);
stopwatch.stop();
std::cout << std::fixed << std::setprecision(2)
@ -115,7 +116,7 @@ int main(int argc, char ** argv)
stream->dumpTree(std::cout);
std::cout << std::endl;
}
catch (const DB::Exception & e)
catch (const Exception & e)
{
std::cerr << e.displayText() << std::endl;
}

View File

@ -85,6 +85,9 @@ void ExpressionAnalyzer::init()
/// GROUP BY injective function elimination.
optimizeGroupBy();
/// Удалить из ORDER BY повторяющиеся элементы.
optimizeOrderBy();
/// array_join_alias_to_name, array_join_result_to_source.
getArrayJoinedColumns();
@ -162,11 +165,13 @@ void ExpressionAnalyzer::analyzeAggregation()
}
NameAndTypePair key{column_name, col.type};
aggregation_keys.push_back(key);
/// Ключи агрегации уникализируются.
if (!unique_keys.count(key.name))
{
unique_keys.insert(key.name);
aggregation_keys.push_back(key);
/// key is no longer needed, therefore we can save a little by moving it
aggregated_columns.push_back(std::move(key));
}
@ -529,6 +534,38 @@ void ExpressionAnalyzer::optimizeGroupBy()
}
void ExpressionAnalyzer::optimizeOrderBy()
{
if (!(select_query && select_query->order_expression_list))
return;
/// Уникализируем условия сортировки.
using NameAndLocale = std::pair<std::string, std::string>;
std::set<NameAndLocale> elems_set;
ASTs & elems = select_query->order_expression_list->children;
ASTs unique_elems;
unique_elems.reserve(elems.size());
for (const auto & elem : elems)
{
String name = elem->children.front()->getColumnName();
const ASTOrderByElement & order_by_elem = typeid_cast<const ASTOrderByElement &>(*elem);
if (elems_set.emplace(
std::piecewise_construct,
std::forward_as_tuple(name),
std::forward_as_tuple(order_by_elem.collator ? order_by_elem.collator->getLocale() : std::string())).second)
{
unique_elems.emplace_back(elem);
}
}
if (unique_elems.size() < elems.size())
elems = unique_elems;
}
void ExpressionAnalyzer::makeSetsForIndex()
{
if (storage && ast && storage->supportsIndexForIn())
@ -1306,6 +1343,7 @@ void ExpressionAnalyzer::getAggregates(ASTPtr ast, ExpressionActionsPtr & action
AggregateDescription aggregate;
aggregate.column_name = node->getColumnName();
/// Агрегатные функции уникализируются.
for (size_t i = 0; i < aggregate_descriptions.size(); ++i)
if (aggregate_descriptions[i].column_name == aggregate.column_name)
return;
@ -1711,7 +1749,8 @@ ExpressionActionsPtr ExpressionAnalyzer::getConstActions()
void ExpressionAnalyzer::getAggregateInfo(Names & key_names, AggregateDescriptions & aggregates)
{
for (NamesAndTypesList::iterator it = aggregation_keys.begin(); it != aggregation_keys.end(); ++it)
key_names.push_back(it->name);
key_names.emplace_back(it->name);
aggregates = aggregate_descriptions;
}

View File

@ -817,12 +817,10 @@ static SortDescription getSortDescription(ASTSelectQuery & query)
{
SortDescription order_descr;
order_descr.reserve(query.order_expression_list->children.size());
for (ASTs::iterator it = query.order_expression_list->children.begin();
it != query.order_expression_list->children.end();
++it)
for (const auto & elem : query.order_expression_list->children)
{
String name = (*it)->children.front()->getColumnName();
const ASTOrderByElement & order_by_elem = typeid_cast<const ASTOrderByElement &>(**it);
String name = elem->children.front()->getColumnName();
const ASTOrderByElement & order_by_elem = typeid_cast<const ASTOrderByElement &>(*elem);
order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.collator);
}

View File

@ -62,9 +62,9 @@ int main(int argc, char ** argv)
DB::BlockInputStreamPtr stream = new DB::OneBlockInputStream(block);
DB::AggregatedDataVariants aggregated_data_variants;
DB::ColumnNumbers key_column_numbers;
key_column_numbers.push_back(0);
key_column_numbers.push_back(1);
DB::Names key_column_names;
key_column_names.emplace_back("x");
key_column_names.emplace_back("s1");
DB::AggregateFunctionFactory factory;
@ -73,7 +73,7 @@ int main(int argc, char ** argv)
DB::DataTypes empty_list_of_types;
aggregate_descriptions[0].function = factory.get("count", empty_list_of_types);
DB::Aggregator aggregator(key_column_numbers, aggregate_descriptions, false, 0, DB::OverflowMode::THROW, nullptr, 0, 0);
DB::Aggregator aggregator(key_column_names, aggregate_descriptions, false, 0, DB::OverflowMode::THROW, nullptr, 0, 0);
{
Poco::Stopwatch stopwatch;

View File

@ -0,0 +1,9 @@
\0
\0\0
\0\0\0
\0\0\0\0
\0\0\0\0\0
\0\0\0\0\0\0
\0\0\0\0\0\0\0
\0\0\0\0\0\0\0\0
\0\0\0\0\0\0\0\0\0

View File

@ -0,0 +1,9 @@
SELECT materialize(toFixedString('', 1)) AS x FROM system.one GROUP BY x;
SELECT materialize(toFixedString('', 2)) AS x FROM system.one GROUP BY x;
SELECT materialize(toFixedString('', 3)) AS x FROM system.one GROUP BY x;
SELECT materialize(toFixedString('', 4)) AS x FROM system.one GROUP BY x;
SELECT materialize(toFixedString('', 5)) AS x FROM system.one GROUP BY x;
SELECT materialize(toFixedString('', 6)) AS x FROM system.one GROUP BY x;
SELECT materialize(toFixedString('', 7)) AS x FROM system.one GROUP BY x;
SELECT materialize(toFixedString('', 8)) AS x FROM system.one GROUP BY x;
SELECT materialize(toFixedString('', 9)) AS x FROM system.one GROUP BY x;

View File

@ -0,0 +1,256 @@
0 0 0
1 1 1
10 10 10
100 100 100
101 101 101
102 102 102
103 103 103
104 104 104
105 105 105
106 106 106
107 107 107
108 108 108
109 109 109
11 11 11
110 110 110
111 111 111
112 112 112
113 113 113
114 114 114
115 115 115
116 116 116
117 117 117
118 118 118
119 119 119
12 12 12
120 120 120
121 121 121
122 122 122
123 123 123
124 124 124
125 125 125
126 126 126
127 127 127
128 128 128
129 129 129
13 13 13
130 130 130
131 131 131
132 132 132
133 133 133
134 134 134
135 135 135
136 136 136
137 137 137
138 138 138
139 139 139
14 14 14
140 140 140
141 141 141
142 142 142
143 143 143
144 144 144
145 145 145
146 146 146
147 147 147
148 148 148
149 149 149
15 15 15
150 150 150
151 151 151
152 152 152
153 153 153
154 154 154
155 155 155
156 156 156
157 157 157
158 158 158
159 159 159
16 16 16
160 160 160
161 161 161
162 162 162
163 163 163
164 164 164
165 165 165
166 166 166
167 167 167
168 168 168
169 169 169
17 17 17
170 170 170
171 171 171
172 172 172
173 173 173
174 174 174
175 175 175
176 176 176
177 177 177
178 178 178
179 179 179
18 18 18
180 180 180
181 181 181
182 182 182
183 183 183
184 184 184
185 185 185
186 186 186
187 187 187
188 188 188
189 189 189
19 19 19
190 190 190
191 191 191
192 192 192
193 193 193
194 194 194
195 195 195
196 196 196
197 197 197
198 198 198
199 199 199
2 2 2
20 20 20
200 200 200
201 201 201
202 202 202
203 203 203
204 204 204
205 205 205
206 206 206
207 207 207
208 208 208
209 209 209
21 21 21
210 210 210
211 211 211
212 212 212
213 213 213
214 214 214
215 215 215
216 216 216
217 217 217
218 218 218
219 219 219
22 22 22
220 220 220
221 221 221
222 222 222
223 223 223
224 224 224
225 225 225
226 226 226
227 227 227
228 228 228
229 229 229
23 23 23
230 230 230
231 231 231
232 232 232
233 233 233
234 234 234
235 235 235
236 236 236
237 237 237
238 238 238
239 239 239
24 24 24
240 240 240
241 241 241
242 242 242
243 243 243
244 244 244
245 245 245
246 246 246
247 247 247
248 248 248
249 249 249
25 25 25
250 250 250
251 251 251
252 252 252
253 253 253
254 254 254
255 255 255
26 26 26
27 27 27
28 28 28
29 29 29
3 3 3
30 30 30
31 31 31
32 32 32
33 33 33
34 34 34
35 35 35
36 36 36
37 37 37
38 38 38
39 39 39
4 4 4
40 40 40
41 41 41
42 42 42
43 43 43
44 44 44
45 45 45
46 46 46
47 47 47
48 48 48
49 49 49
5 5 5
50 50 50
51 51 51
52 52 52
53 53 53
54 54 54
55 55 55
56 56 56
57 57 57
58 58 58
59 59 59
6 6 6
60 60 60
61 61 61
62 62 62
63 63 63
64 64 64
65 65 65
66 66 66
67 67 67
68 68 68
69 69 69
7 7 7
70 70 70
71 71 71
72 72 72
73 73 73
74 74 74
75 75 75
76 76 76
77 77 77
78 78 78
79 79 79
8 8 8
80 80 80
81 81 81
82 82 82
83 83 83
84 84 84
85 85 85
86 86 86
87 87 87
88 88 88
89 89 89
9 9 9
90 90 90
91 91 91
92 92 92
93 93 93
94 94 94
95 95 95
96 96 96
97 97 97
98 98 98
99 99 99

View File

@ -0,0 +1,3 @@
SET max_rows_to_read = 1000000;
SET read_overflow_mode = 'break';
SELECT concat(toString(number % 256 AS n), '') AS s, n, max(s) FROM system.numbers_mt GROUP BY s, n, n, n, n, n, n, n, n, n ORDER BY s, n;

View File

@ -0,0 +1 @@
SELECT n FROM (SELECT number AS n FROM system.numbers LIMIT 1000000) ORDER BY n, n, n, n, n, n, n, n, n, n LIMIT 1000000, 1;

1
libs/libzstd/README Normal file
View File

@ -0,0 +1 @@
https://github.com/Cyan4973/zstd/tree/765207c54934d478488c236749b01c7d6fc63d70/

View File

@ -0,0 +1,26 @@
ZSTD Library
Copyright (c) 2014-2015, Yann Collet
All rights reserved.
BSD License
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice, this
list of conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,380 @@
/* ******************************************************************
FSE : Finite State Entropy coder
header file
Copyright (C) 2013-2015, Yann Collet.
BSD 2-Clause License (http://www.opensource.org/licenses/bsd-license.php)
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
You can contact the author at :
- Source repository : https://github.com/Cyan4973/FiniteStateEntropy
- Public forum : https://groups.google.com/forum/#!forum/lz4c
****************************************************************** */
#pragma once
#if defined (__cplusplus)
extern "C" {
#endif
/******************************************
* Includes
******************************************/
#include <stddef.h> // size_t, ptrdiff_t
/******************************************
* FSE simple functions
******************************************/
size_t FSE_compress(void* dst, size_t maxDstSize,
const void* src, size_t srcSize);
size_t FSE_decompress(void* dst, size_t maxDstSize,
const void* cSrc, size_t cSrcSize);
/*
FSE_compress():
Compress content of buffer 'src', of size 'srcSize', into destination buffer 'dst'.
'dst' buffer must be already allocated, and sized to handle worst case situations.
Worst case size evaluation is provided by FSE_compressBound().
return : size of compressed data
Special values : if result == 0, data is uncompressible => Nothing is stored within cSrc !!
if result == 1, data is one constant element x srcSize times. Use RLE compression.
if FSE_isError(result), it's an error code.
FSE_decompress():
Decompress FSE data from buffer 'cSrc', of size 'cSrcSize',
into already allocated destination buffer 'dst', of size 'maxDstSize'.
** Important ** : This function doesn't decompress uncompressed nor RLE data !
return : size of regenerated data (<= maxDstSize)
or an error code, which can be tested using FSE_isError()
*/
size_t FSE_decompressRLE(void* dst, size_t originalSize,
const void* cSrc, size_t cSrcSize);
/*
FSE_decompressRLE():
Decompress specific RLE corner case (equivalent to memset()).
cSrcSize must be == 1. originalSize must be exact.
return : size of regenerated data (==originalSize)
or an error code, which can be tested using FSE_isError()
Note : there is no function provided for uncompressed data, as it's just a simple memcpy()
*/
/******************************************
* Tool functions
******************************************/
size_t FSE_compressBound(size_t size); /* maximum compressed size */
/* Error Management */
unsigned FSE_isError(size_t code); /* tells if a return value is an error code */
const char* FSE_getErrorName(size_t code); /* provides error code string (useful for debugging) */
/******************************************
* FSE advanced functions
******************************************/
/*
FSE_compress2():
Same as FSE_compress(), but allows the selection of 'maxSymbolValue' and 'tableLog'
Both parameters can be defined as '0' to mean : use default value
return : size of compressed data
or -1 if there is an error
*/
size_t FSE_compress2 (void* dst, size_t dstSize, const void* src, size_t srcSize, unsigned maxSymbolValue, unsigned tableLog);
/******************************************
FSE detailed API
******************************************/
/*
int FSE_compress(char* dest, const char* source, int inputSize) does the following:
1. count symbol occurrence from source[] into table count[]
2. normalize counters so that sum(count[]) == Power_of_2 (2^tableLog)
3. save normalized counters to memory buffer using writeHeader()
4. build encoding table 'CTable' from normalized counters
5. encode the data stream using encoding table
int FSE_decompress(char* dest, int originalSize, const char* compressed) performs:
1. read normalized counters with readHeader()
2. build decoding table 'DTable' from normalized counters
3. decode the data stream using decoding table
The following API allows triggering specific sub-functions.
*/
/* *** COMPRESSION *** */
size_t FSE_count(unsigned* count, const unsigned char* src, size_t srcSize, unsigned* maxSymbolValuePtr);
unsigned FSE_optimalTableLog(unsigned tableLog, size_t srcSize, unsigned maxSymbolValue);
size_t FSE_normalizeCount(short* normalizedCounter, unsigned tableLog, const unsigned* count, size_t total, unsigned maxSymbolValue);
size_t FSE_headerBound(unsigned maxSymbolValue, unsigned tableLog);
size_t FSE_writeHeader (void* headerBuffer, size_t headerBufferSize, const short* normalizedCounter, unsigned maxSymbolValue, unsigned tableLog);
void* FSE_createCTable (unsigned tableLog, unsigned maxSymbolValue);
void FSE_freeCTable (void* CTable);
size_t FSE_buildCTable(void* CTable, const short* normalizedCounter, unsigned maxSymbolValue, unsigned tableLog);
size_t FSE_compress_usingCTable (void* dst, size_t dstSize, const void* src, size_t srcSize, const void* CTable);
/*
The first step is to count all symbols. FSE_count() provides one quick way to do this job.
Result will be saved into 'count', a table of unsigned int, which must be already allocated, and have '*maxSymbolValuePtr+1' cells.
'source' is a table of char of size 'sourceSize'. All values within 'src' MUST be <= *maxSymbolValuePtr
*maxSymbolValuePtr will be updated, with its real value (necessarily <= original value)
FSE_count() will return the number of occurrence of the most frequent symbol.
If there is an error, the function will return an ErrorCode (which can be tested using FSE_isError()).
The next step is to normalize the frequencies.
FSE_normalizeCount() will ensure that sum of frequencies is == 2 ^'tableLog'.
It also guarantees a minimum of 1 to any Symbol which frequency is >= 1.
You can use input 'tableLog'==0 to mean "use default tableLog value".
If you are unsure of which tableLog value to use, you can optionally call FSE_optimalTableLog(),
which will provide the optimal valid tableLog given sourceSize, maxSymbolValue, and a user-defined maximum (0 means "default").
The result of FSE_normalizeCount() will be saved into a table,
called 'normalizedCounter', which is a table of signed short.
'normalizedCounter' must be already allocated, and have at least 'maxSymbolValue+1' cells.
The return value is tableLog if everything proceeded as expected.
It is 0 if there is a single symbol within distribution.
If there is an error(typically, invalid tableLog value), the function will return an ErrorCode (which can be tested using FSE_isError()).
'normalizedCounter' can be saved in a compact manner to a memory area using FSE_writeHeader().
'header' buffer must be already allocated.
For guaranteed success, buffer size must be at least FSE_headerBound().
The result of the function is the number of bytes written into 'header'.
If there is an error, the function will return an ErrorCode (which can be tested using FSE_isError()) (for example, buffer size too small).
'normalizedCounter' can then be used to create the compression tables 'CTable'.
The space required by 'CTable' must be already allocated. Its size is provided by FSE_sizeof_CTable().
'CTable' must be aligned of 4 bytes boundaries.
You can then use FSE_buildCTable() to fill 'CTable'.
In both cases, if there is an error, the function will return an ErrorCode (which can be tested using FSE_isError()).
'CTable' can then be used to compress 'source', with FSE_compress_usingCTable().
Similar to FSE_count(), the convention is that 'source' is assumed to be a table of char of size 'sourceSize'
The function returns the size of compressed data (without header), or -1 if failed.
*/
/* *** DECOMPRESSION *** */
size_t FSE_readHeader (short* normalizedCounter, unsigned* maxSymbolValuePtr, unsigned* tableLogPtr, const void* headerBuffer, size_t hbSize);
void* FSE_createDTable(unsigned tableLog);
void FSE_freeDTable(void* DTable);
size_t FSE_buildDTable (void* DTable, const short* const normalizedCounter, unsigned maxSymbolValue, unsigned tableLog);
size_t FSE_decompress_usingDTable(void* dst, size_t maxDstSize, const void* cSrc, size_t cSrcSize, const void* DTable, size_t fastMode);
/*
If the block is RLE compressed, or uncompressed, use the relevant specific functions.
The first step is to obtain the normalized frequencies of symbols.
This can be performed by reading a header with FSE_readHeader().
'normalizedCounter' must be already allocated, and have at least '*maxSymbolValuePtr+1' cells of short.
In practice, that means it's necessary to know 'maxSymbolValue' beforehand,
or size the table to handle worst case situations (typically 256).
FSE_readHeader will provide 'tableLog' and 'maxSymbolValue' stored into the header.
The result of FSE_readHeader() is the number of bytes read from 'header'.
The following values have special meaning :
return 2 : there is only a single symbol value. The value is provided into the second byte of header.
return 1 : data is uncompressed
If there is an error, the function will return an error code, which can be tested using FSE_isError().
The next step is to create the decompression tables 'DTable' from 'normalizedCounter'.
This is performed by the function FSE_buildDTable().
The space required by 'DTable' must be already allocated and properly aligned.
One can create a DTable using FSE_createDTable().
The function will return 1 if DTable is compatible with fastMode, 0 otherwise.
If there is an error, the function will return an error code, which can be tested using FSE_isError().
'DTable' can then be used to decompress 'compressed', with FSE_decompress_usingDTable().
Only trigger fastMode if it was authorized by result of FSE_buildDTable(), otherwise decompression will fail.
cSrcSize must be correct, otherwise decompression will fail.
FSE_decompress_usingDTable() result will tell how many bytes were regenerated.
If there is an error, the function will return an error code, which can be tested using FSE_isError().
*/
/******************************************
* FSE streaming compression API
******************************************/
typedef struct
{
size_t bitContainer;
int bitPos;
char* startPtr;
char* ptr;
} FSE_CStream_t;
typedef struct
{
ptrdiff_t value;
const void* stateTable;
const void* symbolTT;
unsigned stateLog;
} FSE_CState_t;
void FSE_initCStream(FSE_CStream_t* bitC, void* dstBuffer);
void FSE_initCState(FSE_CState_t* CStatePtr, const void* CTable);
void FSE_encodeByte(FSE_CStream_t* bitC, FSE_CState_t* CStatePtr, unsigned char symbol);
void FSE_addBits(FSE_CStream_t* bitC, size_t value, unsigned nbBits);
void FSE_flushBits(FSE_CStream_t* bitC);
void FSE_flushCState(FSE_CStream_t* bitC, const FSE_CState_t* CStatePtr);
size_t FSE_closeCStream(FSE_CStream_t* bitC);
/*
These functions are inner components of FSE_compress_usingCTable().
They allow creation of custom streams, mixing multiple tables and bit sources.
A key property to keep in mind is that encoding and decoding are done **in reverse direction**.
So the first symbol you will encode is the last you will decode, like a lifo stack.
You will need a few variables to track your CStream. They are :
void* CTable; // Provided by FSE_buildCTable()
FSE_CStream_t bitC; // bitStream tracking structure
FSE_CState_t state; // State tracking structure
The first thing to do is to init the bitStream, and the state.
FSE_initCStream(&bitC, dstBuffer);
FSE_initState(&state, CTable);
You can then encode your input data, byte after byte.
FSE_encodeByte() outputs a maximum of 'tableLog' bits at a time.
Remember decoding will be done in reverse direction.
FSE_encodeByte(&bitStream, &state, symbol);
At any time, you can add any bit sequence.
Note : maximum allowed nbBits is 25, for compatibility with 32-bits decoders
FSE_addBits(&bitStream, bitField, nbBits);
The above methods don't commit data to memory, they just store it into local register, for speed.
Local register size is 64-bits on 64-bits systems, 32-bits on 32-bits systems (size_t).
Writing data to memory is a manual operation, performed by the flushBits function.
FSE_flushBits(&bitStream);
Your last FSE encoding operation shall be to flush your last state value(s).
FSE_flushState(&bitStream, &state);
You must then close the bitStream if you opened it with FSE_initCStream().
It's possible to embed some user-info into the header, as an optionalId [0-31].
The function returns the size in bytes of CStream.
If there is an error, it returns an errorCode (which can be tested using FSE_isError()).
size_t size = FSE_closeCStream(&bitStream, optionalId);
*/
/******************************************
* FSE streaming decompression API
******************************************/
//typedef unsigned int bitD_t;
typedef size_t bitD_t;
typedef struct
{
bitD_t bitContainer;
unsigned bitsConsumed;
const char* ptr;
const char* start;
} FSE_DStream_t;
typedef struct
{
bitD_t state;
const void* table;
} FSE_DState_t;
size_t FSE_initDStream(FSE_DStream_t* bitD, const void* srcBuffer, size_t srcSize);
void FSE_initDState(FSE_DState_t* DStatePtr, FSE_DStream_t* bitD, const void* DTable);
unsigned char FSE_decodeSymbol(FSE_DState_t* DStatePtr, FSE_DStream_t* bitD);
bitD_t FSE_readBits(FSE_DStream_t* bitD, unsigned nbBits);
unsigned int FSE_reloadDStream(FSE_DStream_t* bitD);
unsigned FSE_endOfDStream(const FSE_DStream_t* bitD);
unsigned FSE_endOfDState(const FSE_DState_t* DStatePtr);
/*
Let's now decompose FSE_decompress_usingDTable() into its unitary elements.
You will decode FSE-encoded symbols from the bitStream,
and also any other bitFields you put in, **in reverse order**.
You will need a few variables to track your bitStream. They are :
FSE_DStream_t DStream; // Stream context
FSE_DState_t DState; // State context. Multiple ones are possible
const void* DTable; // Decoding table, provided by FSE_buildDTable()
U32 tableLog; // Provided by FSE_readHeader()
The first thing to do is to init the bitStream.
errorCode = FSE_initDStream(&DStream, &optionalId, srcBuffer, srcSize);
You should then retrieve your initial state(s) (multiple ones are possible) :
errorCode = FSE_initDState(&DState, &DStream, DTable, tableLog);
You can then decode your data, symbol after symbol.
For information the maximum number of bits read by FSE_decodeSymbol() is 'tableLog'.
Keep in mind that symbols are decoded in reverse order, like a lifo stack (last in, first out).
unsigned char symbol = FSE_decodeSymbol(&DState, &DStream);
You can retrieve any bitfield you eventually stored into the bitStream (in reverse order)
Note : maximum allowed nbBits is 25
unsigned int bitField = FSE_readBits(&DStream, nbBits);
All above operations only read from local register (which size is controlled by bitD_t==32 bits).
Reading data from memory is manually performed by the reload method.
endSignal = FSE_reloadDStream(&DStream);
FSE_reloadDStream() result tells if there is still some more data to read from DStream.
0 : there is still some data left into the DStream.
1 Dstream reached end of buffer, but is not yet fully extracted. It will not load data from memory any more.
2 Dstream reached its exact end, corresponding in general to decompression completed.
3 Dstream went too far. Decompression result is corrupted.
When reaching end of buffer(1), progress slowly if you decode multiple symbols per loop,
to properly detect the exact end of stream.
After each decoded symbol, check if DStream is fully consumed using this simple test :
FSE_reloadDStream(&DStream) >= 2
When it's done, verify decompression is fully completed, by checking both DStream and the relevant states.
Checking if DStream has reached its end is performed by :
FSE_endOfDStream(&DStream);
Check also the states. There might be some entropy left there, still able to decode some high probability symbol.
FSE_endOfDState(&DState);
*/
#if defined (__cplusplus)
}
#endif

View File

@ -0,0 +1,108 @@
/* ******************************************************************
FSE : Finite State Entropy coder
header file for static linking (only)
Copyright (C) 2013-2015, Yann Collet
BSD 2-Clause License (http://www.opensource.org/licenses/bsd-license.php)
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
You can contact the author at :
- Source repository : https://github.com/Cyan4973/FiniteStateEntropy
- Public forum : https://groups.google.com/forum/#!forum/lz4c
****************************************************************** */
#pragma once
#if defined (__cplusplus)
extern "C" {
#endif
/******************************************
* Tool functions
******************************************/
#define FSE_MAX_HEADERSIZE 512
#define FSE_COMPRESSBOUND(size) (size + (size>>7) + FSE_MAX_HEADERSIZE) /* Macro can be useful for static allocation */
/******************************************
* Static allocation
******************************************/
/* You can statically allocate a CTable as a table of U32 using below macro */
#define FSE_CTABLE_SIZE_U32(maxTableLog, maxSymbolValue) (1 + (1<<(maxTableLog-1)) + ((maxSymbolValue+1)*2))
#define FSE_DTABLE_SIZE_U32(maxTableLog) ((1<<maxTableLog)+1)
/******************************************
* FSE supported API for DLL
******************************************/
#include "fse.h"
/******************************************
* Error Management
******************************************/
#define FSE_LIST_ERRORS(ITEM) \
ITEM(FSE_OK_NoError) ITEM(FSE_ERROR_GENERIC) \
ITEM(FSE_ERROR_tableLog_tooLarge) ITEM(FSE_ERROR_maxSymbolValue_tooLarge) \
ITEM(FSE_ERROR_dstSize_tooSmall) ITEM(FSE_ERROR_srcSize_wrong)\
ITEM(FSE_ERROR_corruptionDetected) \
ITEM(FSE_ERROR_maxCode)
#define FSE_GENERATE_ENUM(ENUM) ENUM,
typedef enum { FSE_LIST_ERRORS(FSE_GENERATE_ENUM) } FSE_errorCodes; /* enum is exposed, to detect & handle specific errors; compare function result to -enum value */
/******************************************
* FSE advanced API
******************************************/
size_t FSE_countFast(unsigned* count, const unsigned char* src, size_t srcSize, unsigned* maxSymbolValuePtr);
/* same as FSE_count(), but won't check if input really respect that all values within src are <= *maxSymbolValuePtr */
size_t FSE_buildCTable_raw (void* CTable, unsigned nbBits);
/* create a fake CTable, designed to not compress an input where each element uses nbBits */
size_t FSE_buildCTable_rle (void* CTable, unsigned char symbolValue);
/* create a fake CTable, designed to compress a single identical value */
size_t FSE_buildDTable_raw (void* DTable, unsigned nbBits);
/* create a fake DTable, designed to read an uncompressed bitstream where each element uses nbBits */
size_t FSE_buildDTable_rle (void* DTable, unsigned char symbolValue);
/* create a fake DTable, designed to always generate the same symbolValue */
/******************************************
* FSE streaming API
******************************************/
bitD_t FSE_readBitsFast(FSE_DStream_t* bitD, unsigned nbBits);
/* faster, but works only if nbBits >= 1 (otherwise, result will be corrupted) */
unsigned char FSE_decodeSymbolFast(FSE_DState_t* DStatePtr, FSE_DStream_t* bitD);
/* faster, but works only if nbBits >= 1 (otherwise, result will be corrupted) */
#if defined (__cplusplus)
}
#endif

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,93 @@
/*
zstd - standard compression library
Header File
Copyright (C) 2014-2015, Yann Collet.
BSD 2-Clause License (http://www.opensource.org/licenses/bsd-license.php)
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
You can contact the author at :
- zstd source repository : https://github.com/Cyan4973/zstd
- ztsd public forum : https://groups.google.com/forum/#!forum/lz4c
*/
#pragma once
#if defined (__cplusplus)
extern "C" {
#endif
/**************************************
* Includes
**************************************/
#include <stddef.h> /* size_t */
/**************************************
* Version
**************************************/
#define ZSTD_VERSION_MAJOR 0 /* for breaking interface changes */
#define ZSTD_VERSION_MINOR 0 /* for new (non-breaking) interface capabilities */
#define ZSTD_VERSION_RELEASE 2 /* for tweaks, bug-fixes, or development */
#define ZSTD_VERSION_NUMBER (ZSTD_VERSION_MAJOR *100*100 + ZSTD_VERSION_MINOR *100 + ZSTD_VERSION_RELEASE)
unsigned ZSTD_versionNumber (void);
/**************************************
* Simple one-step functions
**************************************/
size_t ZSTD_compress( void* dst, size_t maxDstSize,
const void* src, size_t srcSize);
size_t ZSTD_decompress( void* dst, size_t maxOriginalSize,
const void* src, size_t compressedSize);
/*
ZSTD_compress() :
Compresses 'srcSize' bytes from buffer 'src' into buffer 'dst', of maximum size 'dstSize'.
Destination buffer should be sized to handle worst cases situations (input data not compressible).
Worst case size evaluation is provided by function ZSTD_compressBound().
return : the number of bytes written into buffer 'dst'
or an error code if it fails (which can be tested using ZSTD_isError())
ZSTD_decompress() :
compressedSize : is obviously the source size
maxOriginalSize : is the size of the 'dst' buffer, which must be already allocated.
It must be equal or larger than originalSize, otherwise decompression will fail.
return : the number of bytes decompressed into destination buffer (originalSize)
or an errorCode if it fails (which can be tested using ZSTD_isError())
*/
/**************************************
* Tool functions
**************************************/
size_t ZSTD_compressBound(size_t srcSize); /* maximum compressed size */
/* Error Management */
unsigned ZSTD_isError(size_t code); /* tells if a return value is an error code */
const char* ZSTD_getErrorName(size_t code); /* provides error code string (useful for debugging) */
#if defined (__cplusplus)
}
#endif

View File

@ -0,0 +1,87 @@
/*
zstd - standard compression library
Header File for static linking only
Copyright (C) 2014-2015, Yann Collet.
BSD 2-Clause License (http://www.opensource.org/licenses/bsd-license.php)
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
You can contact the author at :
- zstd source repository : https://github.com/Cyan4973/zstd
- ztsd public forum : https://groups.google.com/forum/#!forum/lz4c
*/
#pragma once
#if defined (__cplusplus)
extern "C" {
#endif
/**************************************
* Includes
**************************************/
#include "zstd.h"
/**************************************
* Streaming functions
**************************************/
typedef void* ZSTD_cctx_t;
ZSTD_cctx_t ZSTD_createCCtx(void);
size_t ZSTD_freeCCtx(ZSTD_cctx_t cctx);
size_t ZSTD_compressBegin(ZSTD_cctx_t cctx, void* dst, size_t maxDstSize);
size_t ZSTD_compressContinue(ZSTD_cctx_t cctx, void* dst, size_t maxDstSize, const void* src, size_t srcSize);
size_t ZSTD_compressEnd(ZSTD_cctx_t cctx, void* dst, size_t maxDstSize);
typedef void* ZSTD_dctx_t;
ZSTD_dctx_t ZSTD_createDCtx(void);
size_t ZSTD_freeDCtx(ZSTD_dctx_t dctx);
size_t ZSTD_nextSrcSizeToDecompress(ZSTD_dctx_t dctx);
size_t ZSTD_decompressContinue(ZSTD_dctx_t dctx, void* dst, size_t maxDstSize, const void* src, size_t srcSize);
/*
Use above functions alternatively.
ZSTD_nextSrcSizeToDecompress() tells how much bytes to provide as input to ZSTD_decompressContinue().
This value is expected to be provided, precisely, as 'srcSize'.
Otherwise, compression will fail (result is an error code, which can be tested using ZSTD_isError() )
ZSTD_decompressContinue() result is the number of bytes regenerated within 'dst'.
It can be zero, which is not an error; it just means ZSTD_decompressContinue() has decoded some header.
*/
/**************************************
* Error management
**************************************/
#define ZSTD_LIST_ERRORS(ITEM) \
ITEM(ZSTD_OK_NoError) ITEM(ZSTD_ERROR_GENERIC) \
ITEM(ZSTD_ERROR_wrongMagicNumber) \
ITEM(ZSTD_ERROR_wrongSrcSize) ITEM(ZSTD_ERROR_maxDstSize_tooSmall) \
ITEM(ZSTD_ERROR_wrongLBlockSize) \
ITEM(ZSTD_ERROR_maxCode)
#define ZSTD_GENERATE_ENUM(ENUM) ENUM,
typedef enum { ZSTD_LIST_ERRORS(ZSTD_GENERATE_ENUM) } ZSTD_errorCodes; /* exposed list of errors; static linking only */
#if defined (__cplusplus)
}
#endif

View File

@ -45,6 +45,7 @@ int main(int argc, char ** argv)
("block-size,b", boost::program_options::value<unsigned>()->default_value(DBMS_DEFAULT_BUFFER_SIZE), "compress in blocks of specified size")
("hc", "use LZ4HC instead of LZ4")
("qlz", "use QuickLZ (level 1) instead of LZ4")
("zstd", "use ZSTD instead of LZ4")
("stat", "print block statistics of compressed data")
;
@ -61,17 +62,20 @@ int main(int argc, char ** argv)
try
{
bool decompress = options.count("d");
bool use_qlz = options.count("qlz");;
bool use_lz4hc = options.count("hc");;
bool use_qlz = options.count("qlz");
bool use_lz4hc = options.count("hc");
bool use_zstd = options.count("zstd");
bool stat_mode = options.count("stat");
unsigned block_size = options["block-size"].as<unsigned>();
DB::CompressionMethod::Enum method = DB::CompressionMethod::LZ4;
DB::CompressionMethod method = DB::CompressionMethod::LZ4;
if (use_qlz)
method = DB::CompressionMethod::QuickLZ;
else if (use_lz4hc)
method = DB::CompressionMethod::LZ4HC;
else if (use_zstd)
method = DB::CompressionMethod::ZSTD;
DB::ReadBufferFromFileDescriptor rb(STDIN_FILENO);
DB::WriteBufferFromFileDescriptor wb(STDOUT_FILENO);