mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Merge pull request #68591 from bigo-sg/orc_dict_encode
Add settings `output_format_orc_dictionary_key_size_threshold` to allow user to enable dict encoding for string column in ORC output format
This commit is contained in:
commit
b21be2bc54
@ -1,4 +1,4 @@
|
||||
clickhouse_add_executable(integer_hash_tables_and_hashes integer_hash_tables_and_hashes.cpp)
|
||||
clickhouse_add_executable(integer_hash_tables_and_hashes integer_hash_tables_and_hashes.cpp orc_string_dictionary.cpp)
|
||||
target_link_libraries (integer_hash_tables_and_hashes PRIVATE
|
||||
ch_contrib::gbenchmark_all
|
||||
dbms
|
||||
@ -7,3 +7,8 @@ target_link_libraries (integer_hash_tables_and_hashes PRIVATE
|
||||
ch_contrib::wyhash
|
||||
ch_contrib::farmhash
|
||||
ch_contrib::xxHash)
|
||||
|
||||
clickhouse_add_executable(orc_string_dictionary orc_string_dictionary.cpp)
|
||||
target_link_libraries (orc_string_dictionary PRIVATE
|
||||
ch_contrib::gbenchmark_all
|
||||
dbms)
|
||||
|
311
src/Common/benchmarks/orc_string_dictionary.cpp
Normal file
311
src/Common/benchmarks/orc_string_dictionary.cpp
Normal file
@ -0,0 +1,311 @@
|
||||
#include <cstdlib>
|
||||
#include <base/defines.h>
|
||||
#include <benchmark/benchmark.h>
|
||||
|
||||
class OldSortedStringDictionary
|
||||
{
|
||||
public:
|
||||
struct DictEntry
|
||||
{
|
||||
DictEntry(const char * str, size_t len) : data(str), length(len) { }
|
||||
const char * data;
|
||||
size_t length;
|
||||
};
|
||||
|
||||
OldSortedStringDictionary() : totalLength(0) { }
|
||||
|
||||
// insert a new string into dictionary, return its insertion order
|
||||
size_t insert(const char * str, size_t len);
|
||||
|
||||
// reorder input index buffer from insertion order to dictionary order
|
||||
void reorder(std::vector<int64_t> & idxBuffer) const;
|
||||
|
||||
// get dict entries in insertion order
|
||||
void getEntriesInInsertionOrder(std::vector<const DictEntry *> &) const;
|
||||
|
||||
size_t size() const;
|
||||
|
||||
// return total length of strings in the dictionary
|
||||
uint64_t length() const;
|
||||
|
||||
void clear();
|
||||
|
||||
// store indexes of insertion order in the dictionary for not-null rows
|
||||
std::vector<int64_t> idxInDictBuffer;
|
||||
|
||||
private:
|
||||
struct LessThan
|
||||
{
|
||||
bool operator()(const DictEntry & left, const DictEntry & right) const
|
||||
{
|
||||
int ret = memcmp(left.data, right.data, std::min(left.length, right.length));
|
||||
if (ret != 0)
|
||||
{
|
||||
return ret < 0;
|
||||
}
|
||||
return left.length < right.length;
|
||||
}
|
||||
};
|
||||
|
||||
std::map<DictEntry, size_t, LessThan> dict;
|
||||
std::vector<std::vector<char>> data;
|
||||
uint64_t totalLength;
|
||||
};
|
||||
|
||||
// insert a new string into dictionary, return its insertion order
|
||||
size_t OldSortedStringDictionary::insert(const char * str, size_t len)
|
||||
{
|
||||
auto ret = dict.insert({DictEntry(str, len), dict.size()});
|
||||
if (ret.second)
|
||||
{
|
||||
// make a copy to internal storage
|
||||
data.push_back(std::vector<char>(len));
|
||||
memcpy(data.back().data(), str, len);
|
||||
// update dictionary entry to link pointer to internal storage
|
||||
DictEntry * entry = const_cast<DictEntry *>(&(ret.first->first));
|
||||
entry->data = data.back().data();
|
||||
totalLength += len;
|
||||
}
|
||||
return ret.first->second;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reorder input index buffer from insertion order to dictionary order
|
||||
*
|
||||
* We require this function because string values are buffered by indexes
|
||||
* in their insertion order. Until the entire dictionary is complete can
|
||||
* we get their sorted indexes in the dictionary in that ORC specification
|
||||
* demands dictionary should be ordered. Therefore this function transforms
|
||||
* the indexes from insertion order to dictionary value order for final
|
||||
* output.
|
||||
*/
|
||||
void OldSortedStringDictionary::reorder(std::vector<int64_t> & idxBuffer) const
|
||||
{
|
||||
// iterate the dictionary to get mapping from insertion order to value order
|
||||
std::vector<size_t> mapping(dict.size());
|
||||
size_t dictIdx = 0;
|
||||
for (auto it = dict.cbegin(); it != dict.cend(); ++it)
|
||||
{
|
||||
mapping[it->second] = dictIdx++;
|
||||
}
|
||||
|
||||
// do the transformation
|
||||
for (size_t i = 0; i != idxBuffer.size(); ++i)
|
||||
{
|
||||
idxBuffer[i] = static_cast<int64_t>(mapping[static_cast<size_t>(idxBuffer[i])]);
|
||||
}
|
||||
}
|
||||
|
||||
// get dict entries in insertion order
|
||||
void OldSortedStringDictionary::getEntriesInInsertionOrder(std::vector<const DictEntry *> & entries) const
|
||||
{
|
||||
entries.resize(dict.size());
|
||||
for (auto it = dict.cbegin(); it != dict.cend(); ++it)
|
||||
{
|
||||
entries[it->second] = &(it->first);
|
||||
}
|
||||
}
|
||||
|
||||
// return count of entries
|
||||
size_t OldSortedStringDictionary::size() const
|
||||
{
|
||||
return dict.size();
|
||||
}
|
||||
|
||||
// return total length of strings in the dictionary
|
||||
uint64_t OldSortedStringDictionary::length() const
|
||||
{
|
||||
return totalLength;
|
||||
}
|
||||
|
||||
void OldSortedStringDictionary::clear()
|
||||
{
|
||||
totalLength = 0;
|
||||
data.clear();
|
||||
dict.clear();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Implementation of increasing sorted string dictionary
|
||||
*/
|
||||
class NewSortedStringDictionary
|
||||
{
|
||||
public:
|
||||
struct DictEntry
|
||||
{
|
||||
DictEntry(const char * str, size_t len) : data(str), length(len) { }
|
||||
const char * data;
|
||||
size_t length;
|
||||
};
|
||||
|
||||
struct DictEntryWithIndex
|
||||
{
|
||||
DictEntryWithIndex(const char * str, size_t len, size_t index_) : entry(str, len), index(index_) { }
|
||||
DictEntry entry;
|
||||
size_t index;
|
||||
};
|
||||
|
||||
NewSortedStringDictionary() : totalLength_(0) { }
|
||||
|
||||
// insert a new string into dictionary, return its insertion order
|
||||
size_t insert(const char * str, size_t len);
|
||||
|
||||
// reorder input index buffer from insertion order to dictionary order
|
||||
void reorder(std::vector<int64_t> & idxBuffer) const;
|
||||
|
||||
// get dict entries in insertion order
|
||||
void getEntriesInInsertionOrder(std::vector<const DictEntry *> &) const;
|
||||
|
||||
// return count of entries
|
||||
size_t size() const;
|
||||
|
||||
// return total length of strings in the dictionary
|
||||
uint64_t length() const;
|
||||
|
||||
void clear();
|
||||
|
||||
// store indexes of insertion order in the dictionary for not-null rows
|
||||
std::vector<int64_t> idxInDictBuffer;
|
||||
|
||||
private:
|
||||
struct LessThan
|
||||
{
|
||||
bool operator()(const DictEntryWithIndex & l, const DictEntryWithIndex & r)
|
||||
{
|
||||
const auto & left = l.entry;
|
||||
const auto & right = r.entry;
|
||||
int ret = memcmp(left.data, right.data, std::min(left.length, right.length));
|
||||
if (ret != 0)
|
||||
{
|
||||
return ret < 0;
|
||||
}
|
||||
return left.length < right.length;
|
||||
}
|
||||
};
|
||||
|
||||
mutable std::vector<DictEntryWithIndex> flatDict_;
|
||||
std::unordered_map<std::string, size_t> keyToIndex;
|
||||
uint64_t totalLength_;
|
||||
};
|
||||
|
||||
// insert a new string into dictionary, return its insertion order
|
||||
size_t NewSortedStringDictionary::insert(const char * str, size_t len)
|
||||
{
|
||||
size_t index = flatDict_.size();
|
||||
auto ret = keyToIndex.emplace(std::string(str, len), index);
|
||||
if (ret.second)
|
||||
{
|
||||
flatDict_.emplace_back(ret.first->first.data(), ret.first->first.size(), index);
|
||||
totalLength_ += len;
|
||||
}
|
||||
return ret.first->second;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reorder input index buffer from insertion order to dictionary order
|
||||
*
|
||||
* We require this function because string values are buffered by indexes
|
||||
* in their insertion order. Until the entire dictionary is complete can
|
||||
* we get their sorted indexes in the dictionary in that ORC specification
|
||||
* demands dictionary should be ordered. Therefore this function transforms
|
||||
* the indexes from insertion order to dictionary value order for final
|
||||
* output.
|
||||
*/
|
||||
void NewSortedStringDictionary::reorder(std::vector<int64_t> & idxBuffer) const
|
||||
{
|
||||
// iterate the dictionary to get mapping from insertion order to value order
|
||||
std::vector<size_t> mapping(flatDict_.size());
|
||||
for (size_t i = 0; i < flatDict_.size(); ++i)
|
||||
{
|
||||
mapping[flatDict_[i].index] = i;
|
||||
}
|
||||
|
||||
// do the transformation
|
||||
for (size_t i = 0; i != idxBuffer.size(); ++i)
|
||||
{
|
||||
idxBuffer[i] = static_cast<int64_t>(mapping[static_cast<size_t>(idxBuffer[i])]);
|
||||
}
|
||||
}
|
||||
|
||||
// get dict entries in insertion order
|
||||
void NewSortedStringDictionary::getEntriesInInsertionOrder(std::vector<const DictEntry *> & entries) const
|
||||
{
|
||||
std::sort(
|
||||
flatDict_.begin(),
|
||||
flatDict_.end(),
|
||||
[](const DictEntryWithIndex & left, const DictEntryWithIndex & right) { return left.index < right.index; });
|
||||
|
||||
entries.resize(flatDict_.size());
|
||||
for (size_t i = 0; i < flatDict_.size(); ++i)
|
||||
{
|
||||
entries[i] = &(flatDict_[i].entry);
|
||||
}
|
||||
}
|
||||
|
||||
// return count of entries
|
||||
size_t NewSortedStringDictionary::size() const
|
||||
{
|
||||
return flatDict_.size();
|
||||
}
|
||||
|
||||
// return total length of strings in the dictionary
|
||||
uint64_t NewSortedStringDictionary::length() const
|
||||
{
|
||||
return totalLength_;
|
||||
}
|
||||
|
||||
void NewSortedStringDictionary::clear()
|
||||
{
|
||||
totalLength_ = 0;
|
||||
keyToIndex.clear();
|
||||
flatDict_.clear();
|
||||
}
|
||||
|
||||
template <size_t cardinality>
|
||||
static std::vector<std::string> mockStrings()
|
||||
{
|
||||
std::vector<std::string> res(1000000);
|
||||
for (auto & s : res)
|
||||
{
|
||||
s = "test string dictionary " + std::to_string(rand() % cardinality);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
template <typename DictionaryImpl>
|
||||
static NO_INLINE std::unique_ptr<DictionaryImpl> createAndWriteStringDictionary(const std::vector<std::string> & strs)
|
||||
{
|
||||
auto dict = std::make_unique<DictionaryImpl>();
|
||||
for (const auto & str : strs)
|
||||
{
|
||||
auto index = dict->insert(str.data(), str.size());
|
||||
dict->idxInDictBuffer.push_back(index);
|
||||
}
|
||||
dict->reorder(dict->idxInDictBuffer);
|
||||
|
||||
return dict;
|
||||
}
|
||||
|
||||
template <typename DictionaryImpl, size_t cardinality>
|
||||
static void BM_writeStringDictionary(benchmark::State & state)
|
||||
{
|
||||
auto strs = mockStrings<cardinality>();
|
||||
for (auto _ : state)
|
||||
{
|
||||
auto dict = createAndWriteStringDictionary<DictionaryImpl>(strs);
|
||||
benchmark::DoNotOptimize(dict);
|
||||
}
|
||||
}
|
||||
|
||||
BENCHMARK_TEMPLATE(BM_writeStringDictionary, OldSortedStringDictionary, 10);
|
||||
BENCHMARK_TEMPLATE(BM_writeStringDictionary, NewSortedStringDictionary, 10);
|
||||
BENCHMARK_TEMPLATE(BM_writeStringDictionary, OldSortedStringDictionary, 100);
|
||||
BENCHMARK_TEMPLATE(BM_writeStringDictionary, NewSortedStringDictionary, 100);
|
||||
BENCHMARK_TEMPLATE(BM_writeStringDictionary, OldSortedStringDictionary, 1000);
|
||||
BENCHMARK_TEMPLATE(BM_writeStringDictionary, NewSortedStringDictionary, 1000);
|
||||
BENCHMARK_TEMPLATE(BM_writeStringDictionary, OldSortedStringDictionary, 10000);
|
||||
BENCHMARK_TEMPLATE(BM_writeStringDictionary, NewSortedStringDictionary, 10000);
|
||||
BENCHMARK_TEMPLATE(BM_writeStringDictionary, OldSortedStringDictionary, 100000);
|
||||
BENCHMARK_TEMPLATE(BM_writeStringDictionary, NewSortedStringDictionary, 100000);
|
||||
|
@ -1271,6 +1271,7 @@ class IColumn;
|
||||
M(Bool, output_format_orc_string_as_string, true, "Use ORC String type instead of Binary for String columns", 0) \
|
||||
M(ORCCompression, output_format_orc_compression_method, "zstd", "Compression method for ORC output format. Supported codecs: lz4, snappy, zlib, zstd, none (uncompressed)", 0) \
|
||||
M(UInt64, output_format_orc_row_index_stride, 10'000, "Target row index stride in ORC output format", 0) \
|
||||
M(Double, output_format_orc_dictionary_key_size_threshold, 0.0, "For a string column in ORC output format, if the number of distinct values is greater than this fraction of the total number of non-null rows, turn off dictionary encoding. Otherwise dictionary encoding is enabled", 0) \
|
||||
\
|
||||
M(CapnProtoEnumComparingMode, format_capn_proto_enum_comparising_mode, FormatSettings::CapnProtoEnumComparingMode::BY_VALUES, "How to map ClickHouse Enum and CapnProto Enum", 0) \
|
||||
\
|
||||
|
@ -71,6 +71,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
|
||||
},
|
||||
{"24.9",
|
||||
{
|
||||
{"output_format_orc_dictionary_key_size_threshold", 0.0, 0.0, "For a string column in ORC output format, if the number of distinct values is greater than this fraction of the total number of non-null rows, turn off dictionary encoding. Otherwise dictionary encoding is enabled"},
|
||||
{"input_format_json_empty_as_default", false, false, "Added new setting to allow to treat empty fields in JSON input as default values."},
|
||||
{"input_format_try_infer_variants", false, false, "Try to infer Variant type in text formats when there is more than one possible type for column/array elements"},
|
||||
{"join_output_by_rowlist_perkey_rows_threshold", 0, 5, "The lower limit of per-key average rows in the right table to determine whether to output by row list in hash join."},
|
||||
|
@ -244,6 +244,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
|
||||
format_settings.orc.output_string_as_string = settings.output_format_orc_string_as_string;
|
||||
format_settings.orc.output_compression_method = settings.output_format_orc_compression_method;
|
||||
format_settings.orc.output_row_index_stride = settings.output_format_orc_row_index_stride;
|
||||
format_settings.orc.output_dictionary_key_size_threshold = settings.output_format_orc_dictionary_key_size_threshold;
|
||||
format_settings.orc.use_fast_decoder = settings.input_format_orc_use_fast_decoder;
|
||||
format_settings.orc.filter_push_down = settings.input_format_orc_filter_push_down;
|
||||
format_settings.orc.reader_time_zone_name = settings.input_format_orc_reader_time_zone_name;
|
||||
|
@ -415,6 +415,7 @@ struct FormatSettings
|
||||
bool filter_push_down = true;
|
||||
UInt64 output_row_index_stride = 10'000;
|
||||
String reader_time_zone_name = "GMT";
|
||||
double output_dictionary_key_size_threshold = 0.0;
|
||||
} orc{};
|
||||
|
||||
/// For capnProto format we should determine how to
|
||||
|
@ -78,7 +78,9 @@ void ORCOutputStream::write(const void* buf, size_t length)
|
||||
}
|
||||
|
||||
ORCBlockOutputFormat::ORCBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
|
||||
: IOutputFormat(header_, out_), format_settings{format_settings_}, output_stream(out_)
|
||||
: IOutputFormat(header_, out_)
|
||||
, format_settings{format_settings_}
|
||||
, output_stream(out_)
|
||||
{
|
||||
for (const auto & type : header_.getDataTypes())
|
||||
data_types.push_back(recursiveRemoveLowCardinality(type));
|
||||
@ -565,6 +567,7 @@ void ORCBlockOutputFormat::prepareWriter()
|
||||
schema = orc::createStructType();
|
||||
options.setCompression(getORCCompression(format_settings.orc.output_compression_method));
|
||||
options.setRowIndexStride(format_settings.orc.output_row_index_stride);
|
||||
options.setDictionaryKeySizeThreshold(format_settings.orc.output_dictionary_key_size_threshold);
|
||||
size_t columns_count = header.columns();
|
||||
for (size_t i = 0; i != columns_count; ++i)
|
||||
schema->addStructField(header.safeGetByPosition(i).name, getORCType(recursiveRemoveLowCardinality(data_types[i])));
|
||||
|
Loading…
Reference in New Issue
Block a user