ExecutableDictionarySource added implicit_key option

This commit is contained in:
Maksim Kita 2021-01-26 23:49:52 +03:00
parent ef72ba7349
commit dafb0ef4e9
12 changed files with 347 additions and 35 deletions

View File

@ -409,6 +409,15 @@ Block Block::cloneWithoutColumns() const
return res;
}
Block Block::cloneWithCutColumns(size_t start, size_t length) const
{
Block copy = *this;
for (size_t i = 0; i < copy.data.size(); ++i)
copy.data[i].column = copy.data[i].column->cut(start, length);
return copy;
}
Block Block::sortColumns() const
{

View File

@ -129,6 +129,7 @@ public:
void setColumns(const Columns & columns);
Block cloneWithColumns(const Columns & columns) const;
Block cloneWithoutColumns() const;
Block cloneWithCutColumns(size_t start, size_t length) const;
/** Get empty columns with the same types as in block. */
MutableColumns cloneEmptyColumns() const;

View File

@ -1,6 +1,5 @@
#include "DictionarySourceHelpers.h"
#include <Columns/ColumnsNumber.h>
#include <Core/Block.h>
#include <Core/ColumnWithTypeAndName.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataTypes/DataTypesNumber.h>
@ -13,44 +12,54 @@
namespace DB
{
/// For simple key
void formatIDs(BlockOutputStreamPtr & out, const std::vector<UInt64> & ids)
void formatWithBlock(BlockOutputStreamPtr & out, Block block)
{
auto column = ColumnUInt64::create(ids.size());
memcpy(column->getData().data(), ids.data(), ids.size() * sizeof(ids.front()));
Block block{{std::move(column), std::make_shared<DataTypeUInt64>(), "id"}};
out->writePrefix();
out->write(block);
out->writeSuffix();
out->flush();
}
/// For simple key
Block blockForIds(const std::vector<UInt64> & ids)
{
auto column = ColumnUInt64::create(ids.size());
memcpy(column->getData().data(), ids.data(), ids.size() * sizeof(ids.front()));
Block block{{std::move(column), std::make_shared<DataTypeUInt64>(), "id"}};
std::cerr << "Block for IDs size " << ids.size() << std::endl;
return block;
}
/// For composite key
void formatKeys(
Block blockForKeys(
const DictionaryStructure & dict_struct,
BlockOutputStreamPtr & out,
const Columns & key_columns,
const std::vector<size_t> & requested_rows)
{
Block block;
for (size_t i = 0, size = key_columns.size(); i < size; ++i)
{
const ColumnPtr & source_column = key_columns[i];
auto filtered_column = source_column->cloneEmpty();
filtered_column->reserve(requested_rows.size());
size_t column_rows_size = source_column->size();
PaddedPODArray<UInt8> filter(column_rows_size, false);
for (size_t idx : requested_rows)
filtered_column->insertFrom(*source_column, idx);
filter[idx] = true;
block.insert({std::move(filtered_column), (*dict_struct.key)[i].type, toString(i)});
auto filtered_column = source_column->filter(filter, requested_rows.size());
block.insert({std::move(filtered_column), (*dict_struct.key)[i].type, (*dict_struct.key)[i].name});
}
out->writePrefix();
out->write(block);
out->writeSuffix();
out->flush();
return block;
}
Context copyContextAndApplySettings(

View File

@ -1,11 +1,15 @@
#pragma once
#include <vector>
#include <Columns/IColumn.h>
#include <common/types.h>
#include <Poco/File.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Columns/IColumn.h>
#include <Core/Block.h>
namespace DB
{
class IBlockOutputStream;
@ -16,13 +20,16 @@ class Context;
/// Write keys to block output stream.
void formatWithBlock(BlockOutputStreamPtr & out, Block block);
/// For simple key
void formatIDs(BlockOutputStreamPtr & out, const std::vector<UInt64> & ids);
Block blockForIds(const std::vector<UInt64> & ids);
/// For composite key
void formatKeys(
Block blockForKeys(
const DictionaryStructure & dict_struct,
BlockOutputStreamPtr & out,
const Columns & key_columns,
const std::vector<size_t> & requested_rows);
@ -36,4 +43,5 @@ void applySettingsToContext(
const std::string & config_prefix,
Context & context,
const Poco::Util::AbstractConfiguration & config);
}

View File

@ -281,6 +281,21 @@ size_t DictionaryStructure::getKeySize() const
});
}
Strings DictionaryStructure::getKeysNames() const
{
if (id)
return { id->name };
auto & key_attributes = *key;
Strings keys_names;
keys_names.reserve(key_attributes.size());
for (const auto & key_attribute : key_attributes)
keys_names.emplace_back(key_attribute.name);
return keys_names;
}
static void checkAttributeKeys(const Poco::Util::AbstractConfiguration::Keys & keys)
{

View File

@ -158,6 +158,8 @@ struct DictionaryStructure final
std::string getKeyDescription() const;
bool isKeySizeFixed() const;
size_t getKeySize() const;
Strings getKeysNames() const;
private:
/// range_min and range_max have to be parsed before this function call
std::vector<DictionaryAttribute> getAttributes(

View File

@ -26,6 +26,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int DICTIONARY_ACCESS_DENIED;
extern const int UNSUPPORTED_METHOD;
}
namespace
@ -65,18 +66,32 @@ ExecutableDictionarySource::ExecutableDictionarySource(
const Context & context_)
: log(&Poco::Logger::get("ExecutableDictionarySource"))
, dict_struct{dict_struct_}
, implicit_key{config.getBool(config_prefix + ".implicit_key", false)}
, command{config.getString(config_prefix + ".command")}
, update_field{config.getString(config_prefix + ".update_field", "")}
, format{config.getString(config_prefix + ".format")}
, sample_block{sample_block_}
, context(context_)
{
/// Remove keys from sample_block for implicit_key dictionary because
/// this columns will not be provided by client
if (implicit_key)
{
auto keys_names = dict_struct.getKeysNames();
for (auto & key_name : keys_names)
{
size_t key_column_position_in_block = sample_block.getPositionByName(key_name);
sample_block.erase(key_column_position_in_block);
}
}
}
ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionarySource & other)
: log(&Poco::Logger::get("ExecutableDictionarySource"))
, update_time{other.update_time}
, dict_struct{other.dict_struct}
, implicit_key{other.implicit_key}
, command{other.command}
, update_field{other.update_field}
, format{other.format}
@ -87,6 +102,9 @@ ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionar
BlockInputStreamPtr ExecutableDictionarySource::loadAll()
{
if (implicit_key)
throw Exception("ExecutableDictionarySource with implicit_key does not support loadAll method", ErrorCodes::UNSUPPORTED_METHOD);
LOG_TRACE(log, "loadAll {}", toString());
auto process = ShellCommand::execute(command);
auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
@ -95,6 +113,9 @@ BlockInputStreamPtr ExecutableDictionarySource::loadAll()
BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll()
{
if (implicit_key)
throw Exception("ExecutableDictionarySource with implicit_key does not support loadUpdatedAll method", ErrorCodes::UNSUPPORTED_METHOD);
time_t new_update_time = time(nullptr);
SCOPE_EXIT(update_time = new_update_time);
@ -173,6 +194,72 @@ namespace
std::function<void(WriteBufferFromFile &)> send_data;
ThreadFromGlobalPool thread;
};
/** A stream, adds additional columns to each block that it will read from inner stream.
*
* block_to_add rows size must be equal to final summ rows size of all inner stream readed blocks.
*/
class BlockInputStreamWithAdditionalColumns final: public IBlockInputStream
{
public:
BlockInputStreamWithAdditionalColumns(
Block block_to_add_,
std::unique_ptr<IBlockInputStream>&& stream_)
: block_to_add(std::move(block_to_add_))
, stream(std::move(stream_))
{
}
Block getHeader() const override
{
auto header = stream->getHeader();
if (header)
{
for (int64_t i = static_cast<uint64_t>(block_to_add.columns() - 1); i >= 0; --i)
header.insert(0, block_to_add.getByPosition(i).cloneEmpty());
}
return header;
}
Block readImpl() override
{
auto block = stream->read();
if (block)
{
auto block_rows = block.rows();
auto cut_block = block_to_add.cloneWithCutColumns(current_range_index, block_rows);
for (int64_t i = static_cast<uint64_t>(cut_block.columns() - 1); i >= 0; --i)
block.insert(0, cut_block.getByPosition(i));
current_range_index += block_rows;
}
return block;
}
void readPrefix() override
{
stream->readPrefix();
}
void readSuffix() override
{
stream->readSuffix();
}
String getName() const override { return "BlockInputStreamWithAdditionalColumns"; }
private:
Block block_to_add;
std::unique_ptr<IBlockInputStream> stream;
size_t current_range_index = 0;
};
}
@ -180,28 +267,44 @@ BlockInputStreamPtr ExecutableDictionarySource::loadIds(const std::vector<UInt64
{
LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size());
return std::make_shared<BlockInputStreamWithBackgroundThread>(
auto block = blockForIds(ids);
auto stream = std::make_unique<BlockInputStreamWithBackgroundThread>(
context, format, sample_block, command, log,
[&ids, this](WriteBufferFromFile & out) mutable
[block, this](WriteBufferFromFile & out) mutable
{
auto output_stream = context.getOutputStream(format, out, sample_block);
formatIDs(output_stream, ids);
auto output_stream = context.getOutputStream(format, out, block.cloneEmpty());
formatWithBlock(output_stream, block);
out.close();
});
if (implicit_key)
{
return std::make_shared<BlockInputStreamWithAdditionalColumns>(block, std::move(stream));
}
else
return std::shared_ptr<BlockInputStreamWithBackgroundThread>(stream.release());
}
BlockInputStreamPtr ExecutableDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size());
return std::make_shared<BlockInputStreamWithBackgroundThread>(
auto block = blockForKeys(dict_struct, key_columns, requested_rows);
auto stream = std::make_unique<BlockInputStreamWithBackgroundThread>(
context, format, sample_block, command, log,
[key_columns, &requested_rows, this](WriteBufferFromFile & out) mutable
[block, this](WriteBufferFromFile & out) mutable
{
auto output_stream = context.getOutputStream(format, out, sample_block);
formatKeys(dict_struct, output_stream, key_columns, requested_rows);
auto output_stream = context.getOutputStream(format, out, block.cloneEmpty());
formatWithBlock(output_stream, block);
out.close();
});
if (implicit_key)
return std::make_shared<BlockInputStreamWithAdditionalColumns>(block, std::move(stream));
else
return std::shared_ptr<BlockInputStreamWithBackgroundThread>(stream.release());
}
bool ExecutableDictionarySource::isModified() const

View File

@ -49,9 +49,9 @@ public:
private:
Poco::Logger * log;
time_t update_time = 0;
const DictionaryStructure dict_struct;
bool implicit_key;
const std::string command;
const std::string update_field;
const std::string format;

View File

@ -130,12 +130,14 @@ BlockInputStreamPtr HTTPDictionarySource::loadUpdatedAll()
BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size());
auto block = blockForIds(ids);
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [&](std::ostream & ostr)
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [block, this](std::ostream & ostr)
{
WriteBufferFromOStream out_buffer(ostr);
auto output_stream = context.getOutputStream(format, out_buffer, sample_block);
formatIDs(output_stream, ids);
formatWithBlock(output_stream, block);
};
Poco::URI uri(url);
@ -150,11 +152,13 @@ BlockInputStreamPtr HTTPDictionarySource::loadKeys(const Columns & key_columns,
{
LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size());
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [&](std::ostream & ostr)
auto block = blockForKeys(dict_struct, key_columns, requested_rows);
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [block, this](std::ostream & ostr)
{
WriteBufferFromOStream out_buffer(ostr);
auto output_stream = context.getOutputStream(format, out_buffer, sample_block);
formatKeys(dict_struct, output_stream, key_columns, requested_rows);
formatWithBlock(output_stream, block);
};
Poco::URI uri(url);

View File

@ -105,4 +105,152 @@
</structure>
</dictionary>
<dictionary>
<name>simple_executable_cache_dictionary_no_implicit_key</name>
<structure>
<id>
<name>id</name>
<type>UInt64</type>
</id>
<attribute>
<name>value</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>
<source>
<executable>
<command>echo "1\tValue"</command>
<format>TabSeparated</format>
<implicit_key>false</implicit_key>
</executable>
</source>
<layout>
<cache>
<size_in_cells>10000</size_in_cells>
</cache>
</layout>
<lifetime>300</lifetime>
</dictionary>
<dictionary>
<name>simple_executable_cache_dictionary_implicit_key</name>
<structure>
<id>
<name>id</name>
<type>UInt64</type>
</id>
<attribute>
<name>value</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>
<source>
<executable>
<command>echo "Value"</command>
<format>TabSeparated</format>
<implicit_key>true</implicit_key>
</executable>
</source>
<layout>
<cache>
<size_in_cells>10000</size_in_cells>
</cache>
</layout>
<lifetime>300</lifetime>
</dictionary>
<dictionary>
<name>complex_executable_cache_dictionary_no_implicit_key</name>
<structure>
<key>
<attribute>
<name>id</name>
<type>UInt64</type>
<null_value></null_value>
</attribute>
<attribute>
<name>id_key</name>
<type>String</type>
<null_value></null_value>
</attribute>
</key>
<attribute>
<name>value</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>
<source>
<executable>
<command>echo "1\tFirstKey\tValue"</command>
<format>TabSeparated</format>
<implicit_key>false</implicit_key>
</executable>
</source>
<layout>
<complex_key_cache>
<size_in_cells>10000</size_in_cells>
</complex_key_cache>
</layout>
<lifetime>300</lifetime>
</dictionary>
<dictionary>
<name>complex_executable_cache_dictionary_implicit_key</name>
<structure>
<key>
<attribute>
<name>id</name>
<type>UInt64</type>
<null_value></null_value>
</attribute>
<attribute>
<name>id_key</name>
<type>String</type>
<null_value></null_value>
</attribute>
</key>
<attribute>
<name>value</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>
<source>
<executable>
<command>echo "Value"</command>
<format>TabSeparated</format>
<implicit_key>true</implicit_key>
</executable>
</source>
<layout>
<complex_key_cache>
<size_in_cells>10000</size_in_cells>
</complex_key_cache>
</layout>
<lifetime>300</lifetime>
</dictionary>
</dictionaries>

View File

@ -1,3 +1,8 @@
999999 1999998 999998000001
999999 1999998 999998000001
999999 1999998 999998000001
Check implicit_key option
Value
Value
Value
Value

View File

@ -1,3 +1,11 @@
SELECT number, dictGet('executable_complex', 'a', (number, number)) AS a, dictGet('executable_complex', 'b', (number, number)) AS b FROM numbers(1000000) WHERE number = 999999;
SELECT number, dictGet('executable_complex_direct', 'a', (number, number)) AS a, dictGet('executable_complex_direct', 'b', (number, number)) AS b FROM numbers(1000000) WHERE number = 999999;
SELECT number, dictGet('executable_simple', 'a', number) AS a, dictGet('executable_simple', 'b', number) AS b FROM numbers(1000000) WHERE number = 999999;
SELECT 'Check implicit_key option';
SELECT dictGet('simple_executable_cache_dictionary_no_implicit_key', 'value', toUInt64(1));
SELECT dictGet('simple_executable_cache_dictionary_implicit_key', 'value', toUInt64(1));
SELECT dictGet('complex_executable_cache_dictionary_no_implicit_key', 'value', (toUInt64(1), 'FirstKey'));
SELECT dictGet('complex_executable_cache_dictionary_implicit_key', 'value', (toUInt64(1), 'FirstKey'));