Merge pull request #34510 from kitaisreal/table-functions-insert-partition-by-refactoring

Improve performance of insert into table functions URL, S3, File, HDFS
This commit is contained in:
alexey-milovidov 2022-02-12 10:14:00 +03:00 committed by GitHub
commit ea71dc9d11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 80 additions and 53 deletions

20
src/Common/ArenaUtils.h Normal file
View File

@ -0,0 +1,20 @@
#pragma once
#include <string.h>
#include <string>
#include <base/StringRef.h>
/** Copy string value into Arena.
* Arena should support method:
* char * alloc(size_t size).
*/
template <typename Arena>
inline StringRef copyStringInArena(Arena & arena, StringRef value)
{
size_t key_size = value.size;
char * place_for_key = arena.alloc(key_size);
memcpy(reinterpret_cast<void *>(place_for_key), reinterpret_cast<const void *>(value.data), key_size);
StringRef result{place_for_key, key_size};
return result;
}

View File

@ -2,6 +2,7 @@
#include <base/StringRef.h>
#include <Common/HashTable/HashMap.h>
#include <Common/ArenaWithFreeLists.h>
#include <Common/ArenaUtils.h>
#include <unordered_map>
#include <list>
#include <atomic>
@ -115,17 +116,6 @@ private:
}
}
StringRef copyStringInArena(const std::string & value_to_copy)
{
size_t value_to_copy_size = value_to_copy.size();
char * place_for_key = arena.alloc(value_to_copy_size);
memcpy(reinterpret_cast<void *>(place_for_key), reinterpret_cast<const void *>(value_to_copy.data()), value_to_copy_size);
StringRef updated_value{place_for_key, value_to_copy_size};
return updated_value;
}
public:
using iterator = typename List::iterator;
@ -139,7 +129,7 @@ public:
if (!it)
{
ListElem elem{copyStringInArena(key), value, true};
ListElem elem{copyStringInArena(arena, key), value, true};
auto itr = list.insert(list.end(), elem);
bool inserted;
map.emplace(itr->key, it, inserted, hash_value);
@ -161,7 +151,7 @@ public:
if (it == map.end())
{
ListElem elem{copyStringInArena(key), value, true};
ListElem elem{copyStringInArena(arena, key), value, true};
auto itr = list.insert(list.end(), elem);
bool inserted;
map.emplace(itr->key, it, inserted, hash_value);

View File

@ -8,10 +8,10 @@
#include <Common/randomSeed.h>
#include <Common/Arena.h>
#include <Common/ArenaWithFreeLists.h>
#include <Common/ArenaUtils.h>
#include <Common/HashTable/LRUHashMap.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Dictionaries/ICacheDictionaryStorage.h>
#include <Dictionaries/DictionaryHelpers.h>
namespace DB

View File

@ -623,17 +623,6 @@ void mergeBlockWithPipe(
}
}
template <typename Arena>
static StringRef copyStringInArena(Arena & arena, StringRef value)
{
size_t key_size = value.size;
char * place_for_key = arena.alloc(key_size);
memcpy(reinterpret_cast<void *>(place_for_key), reinterpret_cast<const void *>(value.data), key_size);
StringRef result{place_for_key, key_size};
return result;
}
/**
* Returns ColumnVector data as PaddedPodArray.

View File

@ -3,6 +3,7 @@
#include <Core/Defines.h>
#include <Common/HashTable/HashMap.h>
#include <Common/HashTable/HashSet.h>
#include <Common/ArenaUtils.h>
#include <DataTypes/DataTypesDecimal.h>
#include <IO/WriteHelpers.h>
@ -13,7 +14,7 @@
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Dictionaries//DictionarySource.h>
#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>

View File

@ -1,5 +1,6 @@
#include "HashedArrayDictionary.h"
#include <Common/ArenaUtils.h>
#include <Core/Defines.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Columns/ColumnsNumber.h>

View File

@ -1,5 +1,6 @@
#include "HashedDictionary.h"
#include <Common/ArenaUtils.h>
#include <Core/Defines.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Columns/ColumnsNumber.h>

View File

@ -1,5 +1,7 @@
#include <Dictionaries/RangeHashedDictionary.h>
#include <Common/ArenaUtils.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeEnum.h>

View File

@ -16,6 +16,7 @@
#include <Common/randomSeed.h>
#include <Common/Arena.h>
#include <Common/ArenaWithFreeLists.h>
#include <Common/ArenaUtils.h>
#include <Common/MemorySanitizer.h>
#include <Common/CurrentMetrics.h>
#include <Common/HashTable/HashMap.h>

View File

@ -1,5 +1,7 @@
#include "PartitionedSink.h"
#include <Common/ArenaUtils.h>
#include <Functions/FunctionsConversion.h>
#include <Interpreters/Context.h>
@ -40,19 +42,18 @@ PartitionedSink::PartitionedSink(
}
SinkPtr PartitionedSink::getSinkForPartition(const String & partition_id)
SinkPtr PartitionedSink::getSinkForPartitionKey(StringRef partition_key)
{
auto it = sinks.find(partition_id);
if (it == sinks.end())
auto it = partition_id_to_sink.find(partition_key);
if (it == partition_id_to_sink.end())
{
auto sink = createSinkForPartition(partition_id);
std::tie(it, std::ignore) = sinks.emplace(partition_id, sink);
auto sink = createSinkForPartition(partition_key.toString());
std::tie(it, std::ignore) = partition_id_to_sink.emplace(partition_key, sink);
}
return it->second;
}
void PartitionedSink::consume(Chunk chunk)
{
const auto & columns = chunk.getColumns();
@ -61,45 +62,59 @@ void PartitionedSink::consume(Chunk chunk)
block_with_partition_by_expr.setColumns(columns);
partition_by_expr->execute(block_with_partition_by_expr);
const auto * column = block_with_partition_by_expr.getByName(partition_by_column_name).column.get();
const auto * partition_by_result_column = block_with_partition_by_expr.getByName(partition_by_column_name).column.get();
std::unordered_map<String, size_t> sub_chunks_indices;
IColumn::Selector selector;
for (size_t row = 0; row < chunk.getNumRows(); ++row)
size_t chunk_rows = chunk.getNumRows();
chunk_row_index_to_partition_index.resize(chunk_rows);
partition_id_to_chunk_index.clear();
for (size_t row = 0; row < chunk_rows; ++row)
{
auto value = column->getDataAt(row);
auto [it, inserted] = sub_chunks_indices.emplace(value, sub_chunks_indices.size());
selector.push_back(it->second);
auto partition_key = partition_by_result_column->getDataAt(row);
auto [it, inserted] = partition_id_to_chunk_index.insert(makePairNoInit(partition_key, partition_id_to_chunk_index.size()));
if (inserted)
it->value.first = copyStringInArena(partition_keys_arena, partition_key);
chunk_row_index_to_partition_index[row] = it->getMapped();
}
Chunks sub_chunks;
sub_chunks.reserve(sub_chunks_indices.size());
for (size_t column_index = 0; column_index < columns.size(); ++column_index)
size_t columns_size = columns.size();
size_t partitions_size = partition_id_to_chunk_index.size();
Chunks partition_index_to_chunk;
partition_index_to_chunk.reserve(partitions_size);
for (size_t column_index = 0; column_index < columns_size; ++column_index)
{
MutableColumns column_sub_chunks = columns[column_index]->scatter(sub_chunks_indices.size(), selector);
if (column_index == 0) /// Set sizes for sub-chunks.
MutableColumns partition_index_to_column_split = columns[column_index]->scatter(partitions_size, chunk_row_index_to_partition_index);
/// Add chunks into partition_index_to_chunk with sizes of result columns
if (column_index == 0)
{
for (const auto & column_sub_chunk : column_sub_chunks)
for (const auto & partition_column : partition_index_to_column_split)
{
sub_chunks.emplace_back(Columns(), column_sub_chunk->size());
partition_index_to_chunk.emplace_back(Columns(), partition_column->size());
}
}
for (size_t sub_chunk_index = 0; sub_chunk_index < column_sub_chunks.size(); ++sub_chunk_index)
for (size_t partition_index = 0; partition_index < partitions_size; ++partition_index)
{
sub_chunks[sub_chunk_index].addColumn(std::move(column_sub_chunks[sub_chunk_index]));
partition_index_to_chunk[partition_index].addColumn(std::move(partition_index_to_column_split[partition_index]));
}
}
for (const auto & [partition_id, sub_chunk_index] : sub_chunks_indices)
for (const auto & [partition_key, partition_index] : partition_id_to_chunk_index)
{
getSinkForPartition(partition_id)->consume(std::move(sub_chunks[sub_chunk_index]));
auto sink = getSinkForPartitionKey(partition_key);
sink->consume(std::move(partition_index_to_chunk[partition_index]));
}
}
void PartitionedSink::onFinish()
{
for (auto & [partition_id, sink] : sinks)
for (auto & [_, sink] : partition_id_to_sink)
{
sink->onFinish();
}

View File

@ -1,5 +1,8 @@
#pragma once
#include <Common/HashTable/HashMap.h>
#include <Common/Arena.h>
#include <absl/container/flat_hash_map.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/Context_fwd.h>
@ -34,9 +37,13 @@ private:
ExpressionActionsPtr partition_by_expr;
String partition_by_column_name;
std::unordered_map<String, SinkPtr> sinks;
absl::flat_hash_map<StringRef, SinkPtr> partition_id_to_sink;
HashMapWithSavedHash<StringRef, size_t> partition_id_to_chunk_index;
IColumn::Selector chunk_row_index_to_partition_index;
Arena partition_keys_arena;
SinkPtr getSinkForPartitionKey(StringRef partition_key);
SinkPtr getSinkForPartition(const String & partition_id);
};
}