Merge pull request #33682 from kitaisreal/dictionary-source-coordinator-fix

DictionarySourceCoordinator update interface
This commit is contained in:
Maksim Kita 2022-01-16 17:40:02 +01:00 committed by GitHub
commit 8f24bd1069
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 123 additions and 146 deletions

View File

@ -504,17 +504,10 @@ Pipe CacheDictionary<dictionary_key_type>::read(const Names & column_names, size
}
std::shared_ptr<const IDictionary> dictionary = shared_from_this();
auto coordinator = std::make_shared<DictionarySourceCoordinator>(dictionary, column_names, std::move(key_columns), max_block_size);
auto coordinator = DictionarySourceCoordinator::create(dictionary, column_names, std::move(key_columns), max_block_size);
auto result = coordinator->read(num_streams);
Pipes pipes;
for (size_t i = 0; i < num_streams; ++i)
{
auto source = std::make_shared<DictionarySource>(coordinator);
pipes.emplace_back(Pipe(std::move(source)));
}
return Pipe::unitePipes(std::move(pipes));
return result;
}
template <DictionaryKeyType dictionary_key_type>

View File

@ -1,6 +1,7 @@
#include "DictionarySource.h"
#include <Dictionaries/DictionaryHelpers.h>
namespace DB
{
@ -10,12 +11,95 @@ namespace ErrorCodes
extern const int NO_SUCH_COLUMN_IN_TABLE;
}
class DictionarySource : public SourceWithProgress
{
public:
explicit DictionarySource(std::shared_ptr<DictionarySourceCoordinator> coordinator_)
: SourceWithProgress(coordinator_->getHeader()), coordinator(std::move(coordinator_))
{
}
private:
String getName() const override { return "DictionarySource"; }
Chunk generate() override
{
ColumnsWithTypeAndName key_columns_to_read;
ColumnsWithTypeAndName data_columns;
if (!coordinator->getKeyColumnsNextRangeToRead(key_columns_to_read, data_columns))
return {};
const auto & header = coordinator->getHeader();
std::vector<ColumnPtr> key_columns;
std::vector<DataTypePtr> key_types;
key_columns.reserve(key_columns_to_read.size());
key_types.reserve(key_columns_to_read.size());
std::unordered_map<std::string_view, ColumnPtr> name_to_column;
for (const auto & key_column_to_read : key_columns_to_read)
{
key_columns.emplace_back(key_column_to_read.column);
key_types.emplace_back(key_column_to_read.type);
if (header.has(key_column_to_read.name))
name_to_column.emplace(key_column_to_read.name, key_column_to_read.column);
}
for (const auto & data_column : data_columns)
{
if (header.has(data_column.name))
name_to_column.emplace(data_column.name, data_column.column);
}
const auto & attributes_names_to_read = coordinator->getAttributesNamesToRead();
const auto & attributes_types_to_read = coordinator->getAttributesTypesToRead();
const auto & attributes_default_values_columns = coordinator->getAttributesDefaultValuesColumns();
const auto & dictionary = coordinator->getDictionary();
auto attributes_columns = dictionary->getColumns(
attributes_names_to_read,
attributes_types_to_read,
key_columns,
key_types,
attributes_default_values_columns);
for (size_t i = 0; i < attributes_names_to_read.size(); ++i)
{
const auto & attribute_name = attributes_names_to_read[i];
name_to_column.emplace(attribute_name, attributes_columns[i]);
}
std::vector<ColumnPtr> result_columns;
result_columns.reserve(header.columns());
for (const auto & column_with_type : header)
{
const auto & header_name = column_with_type.name;
auto it = name_to_column.find(header_name);
if (it == name_to_column.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column name {} not found in result columns", header_name);
result_columns.emplace_back(it->second);
}
size_t rows_size = result_columns[0]->size();
return Chunk(result_columns, rows_size);
}
std::shared_ptr<DictionarySourceCoordinator> coordinator;
};
bool DictionarySourceCoordinator::getKeyColumnsNextRangeToRead(ColumnsWithTypeAndName & key_columns, ColumnsWithTypeAndName & data_columns)
{
size_t read_block_index = parallel_read_block_index++;
size_t start = read_block_index * max_block_size;
size_t end = (read_block_index + 1) * max_block_size;
size_t start = max_block_size * read_block_index;
size_t end = max_block_size * (read_block_index + 1);
size_t keys_size = key_columns_with_type[0].column->size();
@ -112,73 +196,20 @@ DictionarySourceCoordinator::cutColumns(const ColumnsWithTypeAndName & columns_w
return result;
}
Chunk DictionarySource::generate()
Pipe DictionarySourceCoordinator::read(size_t num_streams)
{
ColumnsWithTypeAndName key_columns_to_read;
ColumnsWithTypeAndName data_columns;
Pipes pipes;
pipes.reserve(num_streams);
if (!coordinator->getKeyColumnsNextRangeToRead(key_columns_to_read, data_columns))
return {};
auto coordinator = shared_from_this();
const auto & header = coordinator->getHeader();
std::vector<ColumnPtr> key_columns;
std::vector<DataTypePtr> key_types;
key_columns.reserve(key_columns_to_read.size());
key_types.reserve(key_columns_to_read.size());
std::unordered_map<std::string_view, ColumnPtr> name_to_column;
for (const auto & key_column_to_read : key_columns_to_read)
for (size_t i = 0; i < num_streams; ++i)
{
key_columns.emplace_back(key_column_to_read.column);
key_types.emplace_back(key_column_to_read.type);
if (header.has(key_column_to_read.name))
name_to_column.emplace(key_column_to_read.name, key_column_to_read.column);
auto source = std::make_shared<DictionarySource>(coordinator);
pipes.emplace_back(Pipe(std::move(source)));
}
for (const auto & data_column : data_columns)
{
if (header.has(data_column.name))
name_to_column.emplace(data_column.name, data_column.column);
}
const auto & attributes_names_to_read = coordinator->getAttributesNamesToRead();
const auto & attributes_types_to_read = coordinator->getAttributesTypesToRead();
const auto & attributes_default_values_columns = coordinator->getAttributesDefaultValuesColumns();
const auto & dictionary = coordinator->getDictionary();
auto attributes_columns = dictionary->getColumns(
attributes_names_to_read,
attributes_types_to_read,
key_columns,
key_types,
attributes_default_values_columns);
for (size_t i = 0; i < attributes_names_to_read.size(); ++i)
{
const auto & attribute_name = attributes_names_to_read[i];
name_to_column.emplace(attribute_name, attributes_columns[i]);
}
std::vector<ColumnPtr> result_columns;
result_columns.reserve(header.columns());
for (const auto & column_with_type : header)
{
const auto & header_name = column_with_type.name;
auto it = name_to_column.find(header_name);
if (it == name_to_column.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column name {} not found in result columns", header_name);
result_columns.emplace_back(it->second);
}
size_t rows_size = result_columns[0]->size();
return Chunk(result_columns, rows_size);
return Pipe::unitePipes(std::move(pipes));
}
}

View File

@ -1,12 +1,8 @@
#pragma once
#include <memory>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Columns/IColumn.h>
#include <Core/Names.h>
#include <DataTypes/DataTypesNumber.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Dictionaries/IDictionary.h>
@ -15,10 +11,18 @@
namespace DB
{
class DictionarySourceCoordinator
class DictionarySource;
class DictionarySourceCoordinator final : public shared_ptr_helper<DictionarySourceCoordinator>, public std::enable_shared_from_this<DictionarySourceCoordinator>
{
friend struct shared_ptr_helper<DictionarySourceCoordinator>;
public:
Pipe read(size_t num_streams);
private:
explicit DictionarySourceCoordinator(
std::shared_ptr<const IDictionary> dictionary_,
const Names & column_names,
@ -45,6 +49,8 @@ public:
initialize(column_names);
}
friend class DictionarySource;
bool getKeyColumnsNextRangeToRead(ColumnsWithTypeAndName & key_columns, ColumnsWithTypeAndName & data_columns);
const Block & getHeader() const { return header; }
@ -57,7 +63,6 @@ public:
const std::shared_ptr<const IDictionary> & getDictionary() const { return dictionary; }
private:
void initialize(const Names & column_names);
static ColumnsWithTypeAndName cutColumns(const ColumnsWithTypeAndName & columns_with_type, size_t start, size_t length);
@ -77,21 +82,4 @@ private:
std::atomic<size_t> parallel_read_block_index = 0;
};
class DictionarySource : public SourceWithProgress
{
public:
explicit DictionarySource(std::shared_ptr<DictionarySourceCoordinator> coordinator_)
: SourceWithProgress(coordinator_->getHeader()), coordinator(std::move(coordinator_))
{
}
private:
String getName() const override { return "DictionarySource"; }
Chunk generate() override;
std::shared_ptr<DictionarySourceCoordinator> coordinator;
};
}

View File

@ -550,17 +550,10 @@ Pipe FlatDictionary::read(const Names & column_names, size_t max_block_size, siz
ColumnsWithTypeAndName key_columns = {ColumnWithTypeAndName(getColumnFromPODArray(keys), std::make_shared<DataTypeUInt64>(), dict_struct.id->name)};
std::shared_ptr<const IDictionary> dictionary = shared_from_this();
auto coordinator = std::make_shared<DictionarySourceCoordinator>(dictionary, column_names, std::move(key_columns), max_block_size);
auto coordinator = DictionarySourceCoordinator::create(dictionary, column_names, std::move(key_columns), max_block_size);
auto result = coordinator->read(num_streams);
Pipes pipes;
for (size_t i = 0; i < num_streams; ++i)
{
auto source = std::make_shared<DictionarySource>(coordinator);
pipes.emplace_back(Pipe(std::move(source)));
}
return Pipe::unitePipes(std::move(pipes));
return result;
}
void registerDictionaryFlat(DictionaryFactory & factory)

View File

@ -758,17 +758,10 @@ Pipe HashedArrayDictionary<dictionary_key_type>::read(const Names & column_names
key_columns = deserializeColumnsWithTypeAndNameFromKeys(dict_struct, keys, 0, keys.size());
std::shared_ptr<const IDictionary> dictionary = shared_from_this();
auto coordinator = std::make_shared<DictionarySourceCoordinator>(dictionary, column_names, std::move(key_columns), max_block_size);
auto coordinator = DictionarySourceCoordinator::create(dictionary, column_names, std::move(key_columns), max_block_size);
auto result = coordinator->read(num_streams);
Pipes pipes;
for (size_t i = 0; i < num_streams; ++i)
{
auto source = std::make_shared<DictionarySource>(coordinator);
pipes.emplace_back(Pipe(std::move(source)));
}
return Pipe::unitePipes(std::move(pipes));
return result;
}
template class HashedArrayDictionary<DictionaryKeyType::Simple>;

View File

@ -666,17 +666,10 @@ Pipe HashedDictionary<dictionary_key_type, sparse>::read(const Names & column_na
key_columns = deserializeColumnsWithTypeAndNameFromKeys(dict_struct, keys, 0, keys.size());
std::shared_ptr<const IDictionary> dictionary = shared_from_this();
auto coordinator = std::make_shared<DictionarySourceCoordinator>(dictionary, column_names, std::move(key_columns), max_block_size);
auto coordinator = DictionarySourceCoordinator::create(dictionary, column_names, std::move(key_columns), max_block_size);
auto result = coordinator->read(num_streams);
Pipes pipes;
for (size_t i = 0; i < num_streams; ++i)
{
auto source = std::make_shared<DictionarySource>(coordinator);
pipes.emplace_back(Pipe(std::move(source)));
}
return Pipe::unitePipes(std::move(pipes));
return result;
}
template <DictionaryKeyType dictionary_key_type, bool sparse>

View File

@ -866,17 +866,10 @@ Pipe IPAddressDictionary::read(const Names & column_names, size_t max_block_size
}
std::shared_ptr<const IDictionary> dictionary = shared_from_this();
auto coordinator = std::make_shared<DictionarySourceCoordinator>(dictionary, column_names, std::move(key_columns_with_type), std::move(view_columns), max_block_size);
auto coordinator = DictionarySourceCoordinator::create(dictionary, column_names, std::move(key_columns_with_type), std::move(view_columns), max_block_size);
auto result = coordinator->read(num_streams);
Pipes pipes;
for (size_t i = 0; i < num_streams; ++i)
{
auto source = std::make_shared<DictionarySource>(coordinator);
pipes.emplace_back(Pipe(std::move(source)));
}
return Pipe::unitePipes(std::move(pipes));
return result;
}
IPAddressDictionary::RowIdxConstIter IPAddressDictionary::ipNotFound() const

View File

@ -736,17 +736,10 @@ Pipe RangeHashedDictionary<dictionary_key_type>::read(const Names & column_names
ColumnsWithTypeAndName data_columns = {std::move(range_min_column), std::move(range_max_column)};
std::shared_ptr<const IDictionary> dictionary = shared_from_this();
auto coordinator = std::make_shared<DictionarySourceCoordinator>(dictionary, column_names, std::move(key_columns), std::move(data_columns), max_block_size);
auto coordinator = DictionarySourceCoordinator::create(dictionary, column_names, std::move(key_columns), std::move(data_columns), max_block_size);
auto result = coordinator->read(num_streams);
Pipes pipes;
for (size_t i = 0; i < num_streams; ++i)
{
auto source = std::make_shared<DictionarySource>(coordinator);
pipes.emplace_back(Pipe(std::move(source)));
}
return Pipe::unitePipes(std::move(pipes));
return result;
}