StorageDistributed: write to multiple local replicas, post-review corrections. [#METR-12221]

This commit is contained in:
Andrey Mironov 2014-08-22 18:05:34 +04:00
parent 5523bdeaca
commit 09265f50f9
6 changed files with 104 additions and 57 deletions

View File

@ -34,7 +34,7 @@ public:
/// contains names of directories for asynchronous write to StorageDistributed
std::vector<std::string> dir_names;
int weight;
bool has_local_node;
size_t num_local_nodes;
};
std::vector<ShardInfo> shard_info_vec;
std::vector<size_t> slot_to_shard;

View File

@ -20,7 +20,7 @@ namespace
for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it)
{
const auto & address = boost::copy_range<std::string>(*it);
const auto address = boost::copy_range<std::string>(*it);
const auto user_pw_end = strchr(address.data(), '@');
const auto colon = strchr(address.data(), ':');
@ -65,8 +65,8 @@ public:
~DirectoryMonitor()
{
{
std::lock_guard<std::mutex> lock{mutex};
quit = true;
std::lock_guard<std::mutex> lock{mutex};
}
cond.notify_one();
thread.join();

View File

@ -41,56 +41,111 @@ public:
}
private:
template <typename T>
static std::vector<IColumn::Filter> createFiltersImpl(const size_t num_rows, const IColumn * column, const Cluster & cluster)
{
const auto total_weight = cluster.slot_to_shard.size();
const auto num_shards = cluster.shard_info_vec.size();
std::vector<IColumn::Filter> filters(num_shards);
/// const columns contain only one value, therefore we do not need to read it at every iteration
if (column->isConst())
{
const auto data = typeid_cast<const ColumnConst<T> *>(column)->getData();
const auto shard_num = cluster.slot_to_shard[data % total_weight];
for (size_t i = 0; i < num_shards; ++i)
filters[i].assign(num_rows, static_cast<UInt8>(shard_num == i));
}
else
{
const auto & data = typeid_cast<const ColumnVector<T> *>(column)->getData();
for (size_t i = 0; i < num_shards; ++i)
{
filters[i].resize(num_rows);
for (size_t j = 0; j < num_rows; ++j)
filters[i][j] = cluster.slot_to_shard[data[j] % total_weight] == i;
}
}
return filters;
}
std::vector<IColumn::Filter> createFilters(Block block)
{
using create_filters_sig = std::vector<IColumn::Filter>(size_t, const IColumn *, const Cluster &);
/// hashmap of pointers to functions corresponding to each integral type
static std::unordered_map<std::string, create_filters_sig *> creators{
{ TypeName<UInt8>::get(), &createFiltersImpl<UInt8> },
{ TypeName<UInt16>::get(), &createFiltersImpl<UInt16> },
{ TypeName<UInt32>::get(), &createFiltersImpl<UInt32> },
{ TypeName<UInt64>::get(), &createFiltersImpl<UInt64> },
{ TypeName<Int8>::get(), &createFiltersImpl<Int8> },
{ TypeName<Int16>::get(), &createFiltersImpl<Int16> },
{ TypeName<Int32>::get(), &createFiltersImpl<Int32> },
{ TypeName<Int64>::get(), &createFiltersImpl<Int64> },
};
storage.getShardingKeyExpr()->execute(block);
const auto & key_column = block.getByName(storage.getShardingKeyColumnName());
/// check that key column has valid type
const auto it = creators.find(key_column.type->getName());
return it != std::end(creators)
? (*it->second)(block.rowsInFirstColumn(), key_column.column.get(), storage.cluster)
: throw Exception{
"Sharding key expression does not evaluate to an integer type",
ErrorCodes::TYPE_MISMATCH
};
}
void writeSplit(const Block & block)
{
auto block_with_key = block;
storage.getShardingKeyExpr()->execute(block_with_key);
const auto & key_column = block_with_key.getByName(storage.getShardingKeyColumnName()).column;
const auto total_weight = storage.cluster.slot_to_shard.size();
/// shard => block mapping
std::vector<std::unique_ptr<Block>> target_blocks(storage.cluster.shard_info_vec.size());
const auto num_cols = block.columns();
/// cache column pointers for later reuse
std::vector<const IColumn*> columns(num_cols);
for (size_t i = 0; i < columns.size(); ++i)
columns[i] = block.getByPosition(i).column;
for (size_t num_rows = block.rowsInFirstColumn(), row = 0; row < num_rows; ++row)
auto filters = createFilters(block);
const auto num_shards = storage.cluster.shard_info_vec.size();
for (size_t i = 0; i < num_shards; ++i)
{
const auto target_block_idx = storage.cluster.slot_to_shard[key_column->get64(row) % total_weight];
auto & target_block = target_blocks[target_block_idx];
if (!target_block)
target_block = stdext::make_unique<Block>(block.cloneEmpty());
auto target_block = block.cloneEmpty();
for (size_t col = 0; col < num_cols; ++col)
target_block->getByPosition(col).column->insertFrom(*columns[col], row);
}
target_block.getByPosition(col).column = columns[col]->filter(filters[i]);
for (size_t i = 0; i < target_blocks.size(); ++i)
if (const auto & target_block = target_blocks[i])
writeImpl(*target_block, i);
if (target_block.rowsInFirstColumn())
writeImpl(target_block, i);
}
}
void writeImpl(const Block & block, const size_t shard_id = 0)
{
const auto & shard_info = storage.cluster.shard_info_vec[shard_id];
if (shard_info.has_local_node)
writeToLocal(block);
if (shard_info.num_local_nodes)
writeToLocal(block, shard_info.num_local_nodes);
/// dir_names is empty if shard has only local addresses
if (!shard_info.dir_names.empty())
writeToShard(block, shard_info.dir_names);
}
void writeToLocal(const Block & block)
void writeToLocal(const Block & block, const size_t repeats)
{
InterpreterInsertQuery interp{query_ast, storage.context};
auto block_io = interp.execute();
block_io.out->writePrefix();
for (size_t i = 0; i < repeats; ++i)
block_io.out->write(block);
block_io.out->writeSuffix();
}
@ -102,7 +157,7 @@ private:
std::string first_file_tmp_path{};
auto first = true;
const auto & query_string = queryToString<ASTInsertQuery>(query_ast);
const auto & query_string = queryToString(query_ast);
/// write first file, hardlink the others
for (const auto & dir_name : dir_names)

View File

@ -4,13 +4,10 @@
namespace DB
{
template <typename ASTType>
inline std::string queryToString(const ASTPtr & query)
{
const auto & query_ast = typeid_cast<const ASTType &>(*query);
std::ostringstream s;
formatAST(query_ast, s, 0, false, true);
formatAST(*query, s, 0, false, true);
return s.str();
}

View File

@ -105,7 +105,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
* the first element of vector; otherwise we will just .emplace_back
*/
std::vector<std::string> dir_names{};
auto has_local_node = false;
size_t num_local_nodes = 0;
auto first = true;
for (auto jt = replica_keys.begin(); jt != replica_keys.end(); ++jt)
@ -120,7 +120,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
if (isLocal(replica_addresses.back()))
{
has_local_node = true;
++num_local_nodes;
}
else
{
@ -143,7 +143,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
}
slot_to_shard.insert(std::end(slot_to_shard), weight, shard_info_vec.size());
shard_info_vec.push_back({std::move(dir_names), weight, has_local_node});
shard_info_vec.push_back({std::move(dir_names), weight, num_local_nodes});
}
else
throw Exception("Unknown element in config: " + *it, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);

View File

@ -20,35 +20,30 @@ namespace DB
namespace
{
template <typename ASTType> void rewriteImpl(ASTType &, const std::string &, const std::string &) = delete;
/// select query has database and table names as AST pointers
template <> inline void rewriteImpl<ASTSelectQuery>(ASTSelectQuery & query,
const std::string & database, const std::string & table)
/// Создает копию запроса, меняет имена базы данных и таблицы.
inline ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, const std::string & table)
{
query.database = new ASTIdentifier{{}, database, ASTIdentifier::Database};
query.table = new ASTIdentifier{{}, table, ASTIdentifier::Table};
auto modified_query_ast = query->clone();
auto & actual_query = typeid_cast<ASTSelectQuery &>(*modified_query_ast);
actual_query.database = new ASTIdentifier{{}, database, ASTIdentifier::Database};
actual_query.table = new ASTIdentifier{{}, table, ASTIdentifier::Table};
return modified_query_ast;
}
/// insert query has database and table names as bare strings
template <> inline void rewriteImpl<ASTInsertQuery>(ASTInsertQuery & query,
const std::string & database, const std::string & table)
{
query.database = database;
query.table = table;
/// make sure query is not INSERT SELECT
query.select = nullptr;
}
/// Создает копию запроса, меняет имена базы данных и таблицы.
template <typename ASTType>
inline ASTPtr rewriteQuery(const ASTPtr & query, const std::string & database, const std::string & table)
inline ASTPtr rewriteInsertQuery(const ASTPtr & query, const std::string & database, const std::string & table)
{
/// Создаем копию запроса.
auto modified_query_ast = query->clone();
/// Меняем имена таблицы и базы данных
rewriteImpl(typeid_cast<ASTType &>(*modified_query_ast), database, table);
auto & actual_query = typeid_cast<ASTInsertQuery &>(*modified_query_ast);
actual_query.database = database;
actual_query.table = table;
/// make sure query is not INSERT SELECT
actual_query.select = nullptr;
return modified_query_ast;
}
@ -131,9 +126,9 @@ BlockInputStreams StorageDistributed::read(
: QueryProcessingStage::WithMergeableState;
BlockInputStreams res;
const auto & modified_query_ast = rewriteQuery<ASTSelectQuery>(
const auto & modified_query_ast = rewriteSelectQuery(
query, remote_database, remote_table);
const auto & modified_query = queryToString<ASTSelectQuery>(modified_query_ast);
const auto & modified_query = queryToString(modified_query_ast);
/// Цикл по шардам.
for (auto & conn_pool : cluster.pools)
@ -172,7 +167,7 @@ BlockOutputStreamPtr StorageDistributed::write(ASTPtr query)
return new DistributedBlockOutputStream{
*this,
rewriteQuery<ASTInsertQuery>(query, remote_database, remote_table)
rewriteInsertQuery(query, remote_database, remote_table)
};
}