RangeHashedDictionary added update_field support

This commit is contained in:
Maksim Kita 2021-12-03 14:06:58 +03:00
parent ca2a70cd8d
commit 4bbb02bbae
4 changed files with 156 additions and 107 deletions

View File

@ -230,20 +230,32 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
std::string settings_config_prefix = config_prefix + ".clickhouse";
std::unique_ptr<ClickHouseDictionarySource::Configuration> configuration;
std::string host = config.getString(settings_config_prefix + ".host", "localhost");
std::string user = config.getString(settings_config_prefix + ".user", "default");
std::string password = config.getString(settings_config_prefix + ".password", "");
std::string db = config.getString(settings_config_prefix + ".db", default_database);
std::string table = config.getString(settings_config_prefix + ".table", "");
UInt16 port = static_cast<UInt16>(config.getUInt(settings_config_prefix + ".port", default_port));
auto named_collection = created_from_ddl ?
getExternalDataSourceConfiguration(config, settings_config_prefix, global_context) : std::nullopt;
if (named_collection)
{
std::string host = named_collection->host;
UInt16 port = named_collection->port;
configuration = std::make_unique<ClickHouseDictionarySource::Configuration>(
ClickHouseDictionarySource::Configuration{
host = named_collection->host;
user = named_collection->username;
password = named_collection->password;
db = named_collection->database;
table = named_collection->table;
port = named_collection->port;
}
ClickHouseDictionarySource::Configuration configuration{
.host = host,
.user = named_collection->username,
.password = named_collection->password,
.db = named_collection->database,
.table = named_collection->table,
.user = user,
.password = password,
.db = db,
.table = table,
.query = config.getString(settings_config_prefix + ".query", ""),
.where = config.getString(settings_config_prefix + ".where", ""),
.invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""),
@ -251,53 +263,31 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
.update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1),
.port = port,
.is_local = isLocalAddress({host, port}, default_port),
.secure = config.getBool(settings_config_prefix + ".secure", false)
});
}
else
{
std::string host = config.getString(settings_config_prefix + ".host", "localhost");
UInt16 port = static_cast<UInt16>(config.getUInt(settings_config_prefix + ".port", default_port));
configuration = std::make_unique<ClickHouseDictionarySource::Configuration>(
ClickHouseDictionarySource::Configuration{
.host = host,
.user = config.getString(settings_config_prefix + ".user", "default"),
.password = config.getString(settings_config_prefix + ".password", ""),
.db = config.getString(settings_config_prefix + ".db", default_database),
.table = config.getString(settings_config_prefix + ".table", ""),
.query = config.getString(settings_config_prefix + ".query", ""),
.where = config.getString(settings_config_prefix + ".where", ""),
.invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""),
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
.update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1),
.port = port,
.is_local = isLocalAddress({host, port}, default_port),
.secure = config.getBool(settings_config_prefix + ".secure", false)
});
}
.secure = config.getBool(settings_config_prefix + ".secure", false)};
ContextMutablePtr context;
if (configuration->is_local)
if (configuration.is_local)
{
/// We should set user info even for the case when the dictionary is loaded in-process (without TCP communication).
Session session(global_context, ClientInfo::Interface::LOCAL);
session.authenticate(configuration->user, configuration->password, {});
session.authenticate(configuration.user, configuration.password, {});
context = session.makeQueryContext();
}
else
{
context = Context::createCopy(global_context);
}
context->applySettingsChanges(readSettingsFromDictionaryConfig(config, config_prefix));
String dictionary_name = config.getString(".dictionary.name", "");
String dictionary_database = config.getString(".dictionary.database", "");
if (dictionary_name == configuration->table && dictionary_database == configuration->db)
if (dictionary_name == configuration.table && dictionary_database == configuration.db)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ClickHouseDictionarySource table cannot be dictionary table");
return std::make_unique<ClickHouseDictionarySource>(dict_struct, *configuration, sample_block, context);
return std::make_unique<ClickHouseDictionarySource>(dict_struct, configuration, sample_block, context);
};
factory.registerSource("clickhouse", create_table_source);

View File

@ -78,12 +78,14 @@ RangeHashedDictionary<dictionary_key_type>::RangeHashedDictionary(
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
bool require_nonempty_)
bool require_nonempty_,
BlockPtr update_field_loaded_block_)
: IDictionary(dict_id_)
, dict_struct(dict_struct_)
, source_ptr{std::move(source_ptr_)}
, dict_lifetime(dict_lifetime_)
, require_nonempty(require_nonempty_)
, update_field_loaded_block(std::move(update_field_loaded_block_))
{
createAttributes();
loadData();
@ -295,7 +297,6 @@ void RangeHashedDictionary<dictionary_key_type>::createAttributes()
for (const auto & attribute : dict_struct.attributes)
{
attribute_index_by_name.emplace(attribute.name, attributes.size());
attributes.push_back(createAttribute(attribute));
if (attribute.hierarchical)
@ -307,67 +308,20 @@ void RangeHashedDictionary<dictionary_key_type>::createAttributes()
template <DictionaryKeyType dictionary_key_type>
void RangeHashedDictionary<dictionary_key_type>::loadData()
{
if (!source_ptr->hasUpdateField())
{
QueryPipeline pipeline(source_ptr->loadAll());
PullingPipelineExecutor executor(pipeline);
Block block;
while (executor.pull(block))
{
size_t skip_keys_size_offset = dict_struct.getKeysSize();
Columns key_columns;
key_columns.reserve(skip_keys_size_offset);
/// Split into keys columns and attribute columns
for (size_t i = 0; i < skip_keys_size_offset; ++i)
key_columns.emplace_back(block.safeGetByPosition(i).column);
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
DictionaryKeysExtractor<dictionary_key_type> keys_extractor(key_columns, arena_holder.getComplexKeyArena());
const size_t keys_size = keys_extractor.getKeysSize();
element_count += keys_size;
// Support old behaviour, where invalid date means 'open range'.
const bool is_date = isDate(dict_struct.range_min->type);
const auto & min_range_column = unwrapNullableColumn(*block.safeGetByPosition(skip_keys_size_offset).column);
const auto & max_range_column = unwrapNullableColumn(*block.safeGetByPosition(skip_keys_size_offset + 1).column);
skip_keys_size_offset += 2;
for (size_t attribute_index = 0; attribute_index < attributes.size(); ++attribute_index)
{
const auto & attribute_column = *block.safeGetByPosition(attribute_index + skip_keys_size_offset).column;
auto & attribute = attributes[attribute_index];
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
auto key = keys_extractor.extractCurrentKey();
RangeStorageType lower_bound;
RangeStorageType upper_bound;
if (is_date)
{
lower_bound = getColumnIntValueOrDefault(min_range_column, key_index, is_date, 0);
upper_bound = getColumnIntValueOrDefault(max_range_column, key_index, is_date, DATE_LUT_MAX_DAY_NUM + 1);
blockToAttributes(block);
}
}
else
{
lower_bound = getColumnIntValueOrDefault(min_range_column, key_index, is_date, RANGE_MIN_NULL_VALUE);
upper_bound = getColumnIntValueOrDefault(max_range_column, key_index, is_date, RANGE_MAX_NULL_VALUE);
}
if constexpr (std::is_same_v<KeyType, StringRef>)
key = copyKeyInArena(key);
setAttributeValue(attribute, key, Range{lower_bound, upper_bound}, attribute_column[key_index]);
keys_extractor.rollbackCurrentKey();
}
keys_extractor.reset();
}
updateData();
}
if (require_nonempty && 0 == element_count)
@ -497,6 +451,106 @@ void RangeHashedDictionary<dictionary_key_type>::getItemsImpl(
found_count.fetch_add(keys_found, std::memory_order_relaxed);
}
template <DictionaryKeyType dictionary_key_type>
void RangeHashedDictionary<dictionary_key_type>::updateData()
{
if (!update_field_loaded_block || update_field_loaded_block->rows() == 0)
{
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
PullingPipelineExecutor executor(pipeline);
Block block;
while (executor.pull(block))
{
/// We are using this to keep saved data if input stream consists of multiple blocks
if (!update_field_loaded_block)
update_field_loaded_block = std::make_shared<DB::Block>(block.cloneEmpty());
for (size_t attribute_index = 0; attribute_index < block.columns(); ++attribute_index)
{
const IColumn & update_column = *block.getByPosition(attribute_index).column.get();
MutableColumnPtr saved_column = update_field_loaded_block->getByPosition(attribute_index).column->assumeMutable();
saved_column->insertRangeFrom(update_column, 0, update_column.size());
}
}
}
else
{
static constexpr size_t range_columns_size = 2;
auto pipe = source_ptr->loadUpdatedAll();
mergeBlockWithPipe<dictionary_key_type>(
dict_struct.getKeysSize() + range_columns_size,
*update_field_loaded_block,
std::move(pipe));
}
if (update_field_loaded_block)
{
blockToAttributes(*update_field_loaded_block.get());
}
}
template <DictionaryKeyType dictionary_key_type>
void RangeHashedDictionary<dictionary_key_type>::blockToAttributes(const Block & block [[maybe_unused]])
{
size_t skip_keys_size_offset = dict_struct.getKeysSize();
Columns key_columns;
key_columns.reserve(skip_keys_size_offset);
/// Split into keys columns and attribute columns
for (size_t i = 0; i < skip_keys_size_offset; ++i)
key_columns.emplace_back(block.safeGetByPosition(i).column);
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
DictionaryKeysExtractor<dictionary_key_type> keys_extractor(key_columns, arena_holder.getComplexKeyArena());
const size_t keys_size = keys_extractor.getKeysSize();
element_count += keys_size;
// Support old behaviour, where invalid date means 'open range'.
const bool is_date = isDate(dict_struct.range_min->type);
const auto & min_range_column = unwrapNullableColumn(*block.safeGetByPosition(skip_keys_size_offset).column);
const auto & max_range_column = unwrapNullableColumn(*block.safeGetByPosition(skip_keys_size_offset + 1).column);
skip_keys_size_offset += 2;
for (size_t attribute_index = 0; attribute_index < attributes.size(); ++attribute_index)
{
const auto & attribute_column = *block.safeGetByPosition(attribute_index + skip_keys_size_offset).column;
auto & attribute = attributes[attribute_index];
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
auto key = keys_extractor.extractCurrentKey();
RangeStorageType lower_bound;
RangeStorageType upper_bound;
if (is_date)
{
lower_bound = getColumnIntValueOrDefault(min_range_column, key_index, is_date, 0);
upper_bound = getColumnIntValueOrDefault(max_range_column, key_index, is_date, DATE_LUT_MAX_DAY_NUM + 1);
}
else
{
lower_bound = getColumnIntValueOrDefault(min_range_column, key_index, is_date, RANGE_MIN_NULL_VALUE);
upper_bound = getColumnIntValueOrDefault(max_range_column, key_index, is_date, RANGE_MAX_NULL_VALUE);
}
if constexpr (std::is_same_v<KeyType, StringRef>)
key = copyKeyInArena(key);
setAttributeValue(attribute, key, Range{lower_bound, upper_bound}, attribute_column[key_index]);
keys_extractor.rollbackCurrentKey();
}
keys_extractor.reset();
}
}
template <DictionaryKeyType dictionary_key_type>
template <typename T>
void RangeHashedDictionary<dictionary_key_type>::setAttributeValueImpl(Attribute & attribute, KeyType key, const Range & range, const Field & value)

View File

@ -39,7 +39,8 @@ public:
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
bool require_nonempty_);
bool require_nonempty_,
BlockPtr update_field_loaded_block_ = nullptr);
std::string getTypeName() const override { return "RangeHashed"; }
@ -63,7 +64,7 @@ public:
std::shared_ptr<const IExternalLoadable> clone() const override
{
return std::make_shared<RangeHashedDictionary>(getDictionaryID(), dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty);
return std::make_shared<RangeHashedDictionary>(getDictionaryID(), dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, update_field_loaded_block);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }
@ -156,6 +157,10 @@ private:
ValueSetter && set_value,
DefaultValueExtractor & default_value_extractor) const;
void updateData();
void blockToAttributes(const Block & block);
template <typename T>
static void setAttributeValueImpl(Attribute & attribute, KeyType key, const Range & range, const Field & value);
@ -185,8 +190,8 @@ private:
const DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
const bool require_nonempty;
BlockPtr update_field_loaded_block;
std::map<std::string, size_t> attribute_index_by_name;
std::vector<Attribute> attributes;
Arena complex_key_arena;

View File

@ -34,7 +34,7 @@ def started_cluster():
@pytest.mark.parametrize("dictionary_name,dictionary_type", [
("flat_update_field_dictionary", "FLAT"),
("simple_key_hashed_update_field_dictionary", "HASHED"),
("complex_key_hashed_update_field_dictionary", "HASHED")
("complex_key_hashed_update_field_dictionary", "COMPLEX_KEY_HASHED")
])
def test_update_field(started_cluster, dictionary_name, dictionary_type):
create_dictionary_query = """