Merge pull request #3200 from yandex/low-cardinality-function-result-cache

Added cache for low cardinality function result.
This commit is contained in:
alexey-milovidov 2018-09-26 18:12:48 +03:00 committed by GitHub
commit b6e53d22e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 207 additions and 73 deletions

View File

@ -109,8 +109,8 @@ namespace
}
ColumnWithDictionary::ColumnWithDictionary(MutableColumnPtr && column_unique_, MutableColumnPtr && indexes_)
: dictionary(std::move(column_unique_)), idx(std::move(indexes_))
ColumnWithDictionary::ColumnWithDictionary(MutableColumnPtr && column_unique_, MutableColumnPtr && indexes_, bool is_shared)
: dictionary(std::move(column_unique_), is_shared), idx(std::move(indexes_))
{
idx.check(getDictionary().size());
}
@ -621,13 +621,13 @@ void ColumnWithDictionary::Index::countKeys(ColumnUInt64::Container & counts) co
}
ColumnWithDictionary::Dictionary::Dictionary(MutableColumnPtr && column_unique_)
: column_unique(std::move(column_unique_))
ColumnWithDictionary::Dictionary::Dictionary(MutableColumnPtr && column_unique_, bool is_shared)
: column_unique(std::move(column_unique_)), shared(is_shared)
{
checkColumn(*column_unique);
}
ColumnWithDictionary::Dictionary::Dictionary(ColumnPtr column_unique_)
: column_unique(std::move(column_unique_))
ColumnWithDictionary::Dictionary::Dictionary(ColumnPtr column_unique_, bool is_shared)
: column_unique(std::move(column_unique_)), shared(is_shared)
{
checkColumn(*column_unique);
}

View File

@ -17,7 +17,7 @@ class ColumnWithDictionary final : public COWPtrHelper<IColumn, ColumnWithDictio
{
friend class COWPtrHelper<IColumn, ColumnWithDictionary>;
ColumnWithDictionary(MutableColumnPtr && column_unique, MutableColumnPtr && indexes);
ColumnWithDictionary(MutableColumnPtr && column_unique, MutableColumnPtr && indexes, bool is_shared = false);
ColumnWithDictionary(const ColumnWithDictionary & other) = default;
public:
@ -25,14 +25,15 @@ public:
* Use IColumn::mutate in order to make mutable column and mutate shared nested columns.
*/
using Base = COWPtrHelper<IColumn, ColumnWithDictionary>;
static Ptr create(const ColumnPtr & column_unique_, const ColumnPtr & indexes_)
static Ptr create(const ColumnPtr & column_unique_, const ColumnPtr & indexes_, bool is_shared = false)
{
return ColumnWithDictionary::create(column_unique_->assumeMutable(), indexes_->assumeMutable());
return ColumnWithDictionary::create(column_unique_->assumeMutable(), indexes_->assumeMutable(), is_shared);
}
template <typename ... Args, typename = typename std::enable_if<IsMutableColumns<Args ...>::value>::type>
static MutablePtr create(Args &&... args) { return Base::create(std::forward<Args>(args)...); }
static MutablePtr create(MutableColumnPtr && column_unique, MutableColumnPtr && indexes, bool is_shared = false)
{
return Base::create(std::move(column_unique), std::move(indexes), is_shared);
}
std::string getName() const override { return "ColumnWithDictionary"; }
const char * getFamilyName() const override { return "ColumnWithDictionary"; }
@ -240,8 +241,8 @@ private:
{
public:
Dictionary(const Dictionary & other) = default;
explicit Dictionary(MutableColumnPtr && column_unique);
explicit Dictionary(ColumnPtr column_unique);
explicit Dictionary(MutableColumnPtr && column_unique, bool is_shared);
explicit Dictionary(ColumnPtr column_unique, bool is_shared);
const ColumnPtr & getColumnUniquePtr() const { return column_unique; }
ColumnPtr & getColumnUniquePtr() { return column_unique; }

View File

@ -1401,7 +1401,7 @@ public:
const DataTypes & getArgumentTypes() const override { return argument_types; }
const DataTypePtr & getReturnType() const override { return return_type; }
PreparedFunctionPtr prepare(const Block & /*sample_block*/) const override
PreparedFunctionPtr prepare(const Block & /*sample_block*/, const ColumnNumbers & /*arguments*/, size_t /*result*/) const override
{
return std::make_shared<PreparedFunctionCast>(
prepareUnpackDictionaries(getArgumentTypes()[0], getReturnType()), name);

View File

@ -29,7 +29,7 @@ public:
const DataTypes & getArgumentTypes() const override { return argument_types; }
const DataTypePtr & getReturnType() const override { return return_type; }
PreparedFunctionPtr prepare(const Block &) const override
PreparedFunctionPtr prepare(const Block &, const ColumnNumbers &, size_t) const override
{
return std::const_pointer_cast<FunctionExpression>(shared_from_this());
}
@ -113,7 +113,7 @@ public:
const DataTypes & getArgumentTypes() const override { return captured_types; }
const DataTypePtr & getReturnType() const override { return return_type; }
PreparedFunctionPtr prepare(const Block &) const override
PreparedFunctionPtr prepare(const Block &, const ColumnNumbers &, size_t) const override
{
return std::const_pointer_cast<FunctionCapture>(shared_from_this());
}

View File

@ -1,5 +1,6 @@
#include <Common/config.h>
#include <Common/typeid_cast.h>
#include <Common/LRUCache.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnNullable.h>
#include <DataTypes/DataTypeArray.h>
@ -41,6 +42,64 @@ namespace ErrorCodes
}
class PreparedFunctionLowCardinalityResultCache
{
public:
/// Will assume that dictionaries with same hash has the same keys.
/// Just in case, check that they have also the same size.
struct DictionaryKey
{
UInt128 hash;
UInt64 size;
bool operator== (const DictionaryKey & other) const { return hash == other.hash && size == other.size; }
};
struct DictionaryKeyHash
{
size_t operator()(const DictionaryKey & key) const
{
SipHash hash;
hash.update(key.hash.low);
hash.update(key.hash.high);
hash.update(key.size);
return hash.get64();
}
};
struct CachedValues
{
/// Store ptr to dictionary to be sure it won't be deleted.
ColumnPtr dictionary_holder;
ColumnUniquePtr function_result;
/// Remap positions. new_pos = index_mapping->index(old_pos);
ColumnPtr index_mapping;
};
using CachedValuesPtr = std::shared_ptr<CachedValues>;
explicit PreparedFunctionLowCardinalityResultCache(size_t cache_size) : cache(cache_size) {}
CachedValuesPtr get(const DictionaryKey & key) { return cache.get(key); }
void set(const DictionaryKey & key, const CachedValuesPtr & mapped) { cache.set(key, mapped); }
CachedValuesPtr getOrSet(const DictionaryKey & key, const CachedValuesPtr & mapped)
{
return cache.getOrSet(key, [&]() { return mapped; }).first;
}
private:
using Cache = LRUCache<DictionaryKey, CachedValues, DictionaryKeyHash>;
Cache cache;
};
void PreparedFunctionImpl::createLowCardinalityResultCache(size_t cache_size)
{
if (!low_cardinality_result_cache)
low_cardinality_result_cache = std::make_shared<PreparedFunctionLowCardinalityResultCache>(cache_size);
}
static DataTypePtr recursiveRemoveLowCardinality(const DataTypePtr & type)
{
if (!type)
@ -291,6 +350,25 @@ void PreparedFunctionImpl::executeWithoutColumnsWithDictionary(Block & block, co
executeImpl(block, args, result, input_rows_count);
}
static const ColumnWithDictionary * findLowCardinalityArgument(const Block & block, const ColumnNumbers & args)
{
const ColumnWithDictionary * result_column = nullptr;
for (auto arg : args)
{
const ColumnWithTypeAndName & column = block.getByPosition(arg);
if (auto * low_cardinality_column = checkAndGetColumn<ColumnWithDictionary>(column.column.get()))
{
if (result_column)
throw Exception("Expected single dictionary argument for function.", ErrorCodes::LOGICAL_ERROR);
result_column = low_cardinality_column;
}
}
return result_column;
}
static ColumnPtr replaceColumnsWithDictionaryByNestedAndGetDictionaryIndexes(
Block & block, const ColumnNumbers & args, bool can_be_executed_on_default_arguments)
{
@ -361,28 +439,59 @@ void PreparedFunctionImpl::execute(Block & block, const ColumnNumbers & args, si
if (auto * res_type_with_dict = typeid_cast<const DataTypeWithDictionary *>(res.type.get()))
{
const auto * low_cardinality_column = findLowCardinalityArgument(block, args);
bool use_cache = low_cardinality_result_cache
&& low_cardinality_column && low_cardinality_column->isSharedDictionary();
PreparedFunctionLowCardinalityResultCache::DictionaryKey key;
if (use_cache)
{
const auto & dictionary = low_cardinality_column->getDictionary();
key = {dictionary.getHash(), dictionary.size()};
auto cached_values = low_cardinality_result_cache->get(key);
if (cached_values)
{
auto indexes = cached_values->index_mapping->index(low_cardinality_column->getIndexes(), 0);
res.column = ColumnWithDictionary::create(cached_values->function_result, indexes, true);
return;
}
}
block_without_dicts.safeGetByPosition(result).type = res_type_with_dict->getDictionaryType();
ColumnPtr indexes = replaceColumnsWithDictionaryByNestedAndGetDictionaryIndexes(
block_without_dicts, args, canBeExecutedOnDefaultArguments());
executeWithoutColumnsWithDictionary(block_without_dicts, args, result, block_without_dicts.rows());
auto res_column = res.type->createColumn();
auto * column_with_dictionary = typeid_cast<ColumnWithDictionary *>(res_column.get());
if (!column_with_dictionary)
throw Exception("Expected LowCardinality column, got " + res_column->getName(), ErrorCodes::LOGICAL_ERROR);
auto & keys = block_without_dicts.safeGetByPosition(result).column;
if (auto full_column = keys->convertToFullColumnIfConst())
keys = full_column;
if (indexes)
column_with_dictionary->insertRangeFromDictionaryEncodedColumn(*keys, *indexes);
else
column_with_dictionary->insertRangeFromFullColumn(*keys, 0, keys->size());
auto res_mut_dictionary = DataTypeWithDictionary::createColumnUnique(*res_type_with_dict->getDictionaryType());
ColumnPtr res_indexes = res_mut_dictionary->uniqueInsertRangeFrom(*keys, 0, keys->size());
ColumnUniquePtr res_dictionary = std::move(res_mut_dictionary);
res.column = std::move(res_column);
if (indexes)
{
if (use_cache)
{
auto cache_values = std::make_shared<PreparedFunctionLowCardinalityResultCache::CachedValues>();
cache_values->dictionary_holder = low_cardinality_column->getDictionaryPtr();
cache_values->function_result = res_dictionary;
cache_values->index_mapping = res_indexes;
cache_values = low_cardinality_result_cache->getOrSet(key, cache_values);
res_dictionary = cache_values->function_result;
res_indexes = cache_values->index_mapping;
}
res.column = ColumnWithDictionary::create(res_dictionary, res_indexes->index(*indexes, 0), use_cache);
}
else
{
res.column = ColumnWithDictionary::create(res_dictionary, res_indexes);
}
}
else
{

View File

@ -45,11 +45,20 @@ public:
using PreparedFunctionPtr = std::shared_ptr<IPreparedFunction>;
/// Cache for functions result if it was executed on low cardinality column.
class PreparedFunctionLowCardinalityResultCache;
using PreparedFunctionLowCardinalityResultCachePtr = std::shared_ptr<PreparedFunctionLowCardinalityResultCache>;
class PreparedFunctionImpl : public IPreparedFunction
{
public:
void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) final;
/// Create cache which will be used to store result of function executed on LowCardinality column.
/// Only for default LowCardinality implementation.
/// Cannot be called concurrently for the same object.
void createLowCardinalityResultCache(size_t cache_size);
protected:
virtual void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) = 0;
@ -85,11 +94,14 @@ protected:
private:
bool defaultImplementationForNulls(Block & block, const ColumnNumbers & args, size_t result,
size_t input_rows_count);
size_t input_rows_count);
bool defaultImplementationForConstantArguments(Block & block, const ColumnNumbers & args, size_t result,
size_t input_rows_count);
size_t input_rows_count);
void executeWithoutColumnsWithDictionary(Block & block, const ColumnNumbers & arguments, size_t result,
size_t input_rows_count);
/// Cache is created by function createLowCardinalityResultCache()
PreparedFunctionLowCardinalityResultCachePtr low_cardinality_result_cache;
};
using ValuePlaceholders = std::vector<std::function<llvm::Value * ()>>;
@ -108,12 +120,12 @@ public:
/// Do preparations and return executable.
/// sample_block should contain data types of arguments and values of constants, if relevant.
virtual PreparedFunctionPtr prepare(const Block & sample_block) const = 0;
virtual PreparedFunctionPtr prepare(const Block & sample_block, const ColumnNumbers & arguments, size_t result) const = 0;
/// TODO: make const
virtual void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count)
{
return prepare(block)->execute(block, arguments, result, input_rows_count);
return prepare(block, arguments, result)->execute(block, arguments, result, input_rows_count);
}
#if USE_EMBEDDED_COMPILER
@ -322,7 +334,7 @@ public:
using FunctionBuilderImpl::getLambdaArgumentTypesImpl;
using FunctionBuilderImpl::getReturnType;
PreparedFunctionPtr prepare(const Block & /*sample_block*/) const final
PreparedFunctionPtr prepare(const Block & /*sample_block*/, const ColumnNumbers & /*arguments*/, size_t /*result*/) const final
{
throw Exception("prepare is not implemented for IFunction", ErrorCodes::NOT_IMPLEMENTED);
}
@ -421,7 +433,10 @@ public:
#endif
PreparedFunctionPtr prepare(const Block & /*sample_block*/) const override { return std::make_shared<DefaultExecutable>(function); }
PreparedFunctionPtr prepare(const Block & /*sample_block*/, const ColumnNumbers & /*arguments*/, size_t /*result*/) const override
{
return std::make_shared<DefaultExecutable>(function);
}
bool isSuitableForConstantFolding() const override { return function->isSuitableForConstantFolding(); }

View File

@ -174,7 +174,7 @@ ExpressionAction ExpressionAction::ordinaryJoin(std::shared_ptr<const Join> join
}
void ExpressionAction::prepare(Block & sample_block)
void ExpressionAction::prepare(Block & sample_block, const Settings & settings)
{
// std::cerr << "preparing: " << toString() << std::endl;
@ -199,16 +199,16 @@ void ExpressionAction::prepare(Block & sample_block)
all_const = false;
}
size_t result_position = sample_block.columns();
sample_block.insert({nullptr, result_type, result_name});
function = function_base->prepare(sample_block, arguments, result_position);
if (auto * prepared_function = dynamic_cast<PreparedFunctionImpl *>(function.get()))
prepared_function->createLowCardinalityResultCache(settings.max_threads);
/// If all arguments are constants, and function is suitable to be executed in 'prepare' stage - execute function.
if (all_const && function->isSuitableForConstantFolding())
if (all_const && function_base->isSuitableForConstantFolding())
{
size_t result_position = sample_block.columns();
ColumnWithTypeAndName new_column;
new_column.name = result_name;
new_column.type = result_type;
sample_block.insert(std::move(new_column));
function->execute(sample_block, arguments, result_position, sample_block.rows());
/// If the result is not a constant, just in case, we will consider the result as unknown.
@ -227,10 +227,6 @@ void ExpressionAction::prepare(Block & sample_block)
col.column = col.column->cloneResized(1);
}
}
else
{
sample_block.insert({nullptr, result_type, result_name});
}
break;
}
@ -569,7 +565,7 @@ std::string ExpressionAction::toString() const
case APPLY_FUNCTION:
ss << "FUNCTION " << result_name << " " << (is_function_compiled ? "[compiled] " : "")
<< (result_type ? result_type->getName() : "(no type)") << " = "
<< (function ? function->getName() : "(no function)") << "(";
<< (function_base ? function_base->getName() : "(no function)") << "(";
for (size_t i = 0; i < argument_names.size(); ++i)
{
if (i)
@ -688,11 +684,11 @@ void ExpressionActions::addImpl(ExpressionAction action, Names & new_names)
arguments[i] = sample_block.getByName(action.argument_names[i]);
}
action.function = action.function_builder->build(arguments);
action.result_type = action.function->getReturnType();
action.function_base = action.function_builder->build(arguments);
action.result_type = action.function_base->getReturnType();
}
action.prepare(sample_block);
action.prepare(sample_block, settings);
actions.push_back(action);
}
@ -927,6 +923,7 @@ void ExpressionActions::finalize(const Names & output_columns)
action.result_type = result.type;
action.added_column = result.column;
action.function_builder = nullptr;
action.function_base = nullptr;
action.function = nullptr;
action.argument_names.clear();
in.clear();
@ -1166,10 +1163,10 @@ UInt128 ExpressionAction::ActionHash::operator()(const ExpressionAction & action
hash.update(action.result_name);
if (action.result_type)
hash.update(action.result_type->getName());
if (action.function)
if (action.function_base)
{
hash.update(action.function->getName());
for (const auto & arg_type : action.function->getArgumentTypes())
hash.update(action.function_base->getName());
for (const auto & arg_type : action.function_base->getArgumentTypes())
hash.update(arg_type->getName());
}
for (const auto & arg_name : action.argument_names)
@ -1209,15 +1206,15 @@ bool ExpressionAction::operator==(const ExpressionAction & other) const
return false;
}
if (function != other.function)
if (function_base != other.function_base)
{
if (function == nullptr || other.function == nullptr)
if (function_base == nullptr || other.function_base == nullptr)
return false;
else if (function->getName() != other.function->getName())
else if (function_base->getName() != other.function_base->getName())
return false;
const auto & my_arg_types = function->getArgumentTypes();
const auto & other_arg_types = other.function->getArgumentTypes();
const auto & my_arg_types = function_base->getArgumentTypes();
const auto & other_arg_types = other.function_base->getArgumentTypes();
if (my_arg_types.size() != other_arg_types.size())
return false;

View File

@ -25,6 +25,9 @@ using NamesWithAliases = std::vector<NameWithAlias>;
class Join;
class IPreparedFunction;
using PreparedFunctionPtr = std::shared_ptr<IPreparedFunction>;
class IFunctionBase;
using FunctionBasePtr = std::shared_ptr<IFunctionBase>;
@ -87,7 +90,8 @@ public:
/// For APPLY_FUNCTION and LEFT ARRAY JOIN.
FunctionBuilderPtr function_builder;
FunctionBasePtr function;
FunctionBasePtr function_base;
PreparedFunctionPtr function;
Names argument_names;
bool is_function_compiled = false;
@ -135,7 +139,7 @@ public:
private:
friend class ExpressionActions;
void prepare(Block & sample_block);
void prepare(Block & sample_block, const Settings & settings);
size_t getInputRowsCount(Block & block, std::unordered_map<std::string, size_t> & input_rows_counts) const;
void execute(Block & block, std::unordered_map<std::string, size_t> & input_rows_counts) const;
void executeOnTotals(Block & block) const;

View File

@ -492,7 +492,7 @@ LLVMFunction::LLVMFunction(const ExpressionActions::Actions & actions, std::shar
for (const auto & action : actions)
{
const auto & names = action.argument_names;
const auto & types = action.function->getArgumentTypes();
const auto & types = action.function_base->getArgumentTypes();
std::vector<CompilableExpression> args;
for (size_t i = 0; i < names.size(); ++i)
{
@ -504,13 +504,13 @@ LLVMFunction::LLVMFunction(const ExpressionActions::Actions & actions, std::shar
}
args.push_back(inserted.first->second);
}
subexpressions[action.result_name] = subexpression(*action.function, std::move(args));
originals.push_back(action.function);
subexpressions[action.result_name] = subexpression(*action.function_base, std::move(args));
originals.push_back(action.function_base);
}
compileFunctionToLLVMByteCode(context, *this);
}
PreparedFunctionPtr LLVMFunction::prepare(const Block &) const { return std::make_shared<LLVMPreparedFunction>(name, context); }
PreparedFunctionPtr LLVMFunction::prepare(const Block &, const ColumnNumbers &, size_t) const { return std::make_shared<LLVMPreparedFunction>(name, context); }
bool LLVMFunction::isDeterministic() const
{
@ -658,7 +658,7 @@ std::vector<std::unordered_set<std::optional<size_t>>> getActionsDependents(cons
case ExpressionAction::APPLY_FUNCTION:
{
dependents[i] = current_dependents[actions[i].result_name];
const bool compilable = isCompilable(*actions[i].function);
const bool compilable = isCompilable(*actions[i].function_base);
for (const auto & name : actions[i].argument_names)
{
if (compilable)
@ -696,7 +696,7 @@ void compileFunctions(ExpressionActions::Actions & actions, const Names & output
std::vector<ExpressionActions::Actions> fused(actions.size());
for (size_t i = 0; i < actions.size(); ++i)
{
if (actions[i].type != ExpressionAction::APPLY_FUNCTION || !isCompilable(*actions[i].function))
if (actions[i].type != ExpressionAction::APPLY_FUNCTION || !isCompilable(*actions[i].function_base))
continue;
fused[i].push_back(actions[i]);
@ -739,7 +739,7 @@ void compileFunctions(ExpressionActions::Actions & actions, const Names & output
ProfileEvents::increment(ProfileEvents::CompileExpressionsMicroseconds, watch.elapsedMicroseconds());
}
actions[i].function = fn;
actions[i].function_base = fn;
actions[i].argument_names = fn->getArgumentNames();
actions[i].is_function_compiled = true;
@ -758,6 +758,14 @@ void compileFunctions(ExpressionActions::Actions & actions, const Names & output
size_t used_memory = context->compileAllFunctionsToNativeCode();
ProfileEvents::increment(ProfileEvents::CompileExpressionsBytes, used_memory);
}
for (size_t i = 0; i < actions.size(); ++i)
{
if (actions[i].type == ExpressionAction::APPLY_FUNCTION && actions[i].is_function_compiled)
{
actions[i].function = actions[i].function_base->prepare({}, {}, 0); /// Arguments are not used for LLVMFunction.
}
}
}
}

View File

@ -40,7 +40,7 @@ public:
const DataTypePtr & getReturnType() const override { return originals.back()->getReturnType(); }
PreparedFunctionPtr prepare(const Block &) const override;
PreparedFunctionPtr prepare(const Block &, const ColumnNumbers &, size_t) const override;
bool isDeterministic() const override;

View File

@ -425,17 +425,17 @@ bool KeyCondition::canConstantBeWrappedByMonotonicFunctions(
const auto & action = a.argument_names;
if (a.type == ExpressionAction::Type::APPLY_FUNCTION && action.size() == 1 && a.argument_names[0] == expr_name)
{
if (!a.function->hasInformationAboutMonotonicity())
if (!a.function_base->hasInformationAboutMonotonicity())
return false;
// Range is irrelevant in this case
IFunction::Monotonicity monotonicity = a.function->getMonotonicityForRange(*out_type, Field(), Field());
IFunction::Monotonicity monotonicity = a.function_base->getMonotonicityForRange(*out_type, Field(), Field());
if (!monotonicity.is_always_monotonic)
return false;
// Apply the next transformation step
DataTypePtr new_type;
applyFunction(a.function, out_type, out_value, new_type, out_value);
applyFunction(a.function_base, out_type, out_value, new_type, out_value);
if (!new_type)
return false;

View File

@ -194,7 +194,7 @@ static void checkKeyExpression(const ExpressionActions & expr, const Block & sam
if (action.type == ExpressionAction::APPLY_FUNCTION)
{
IFunctionBase & func = *action.function;
IFunctionBase & func = *action.function_base;
if (!func.isDeterministic())
throw Exception(key_name + " key cannot contain non-deterministic functions, "
"but contains function " + func.getName(),