Fix using materialized column as sharding key.

This commit is contained in:
Vitaly Baranov 2021-10-03 23:06:31 +03:00
parent 217659c1c6
commit 1636ee24bb
3 changed files with 56 additions and 51 deletions

View File

@ -3,6 +3,8 @@
#include <Storages/StorageDistributed.h>
#include <Disks/StoragePolicy.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/queryToString.h>
@ -87,26 +89,40 @@ static void writeBlockConvert(PushingPipelineExecutor & executor, const Block &
}
static ASTPtr createInsertToRemoteTableQuery(const std::string & database, const std::string & table, const Names & column_names)
{
auto query = std::make_shared<ASTInsertQuery>();
query->table_id = StorageID(database, table);
auto columns = std::make_shared<ASTExpressionList>();
query->columns = columns;
query->children.push_back(columns);
for (const auto & column_name : column_names)
columns->children.push_back(std::make_shared<ASTIdentifier>(column_name));
return query;
}
DistributedSink::DistributedSink(
ContextPtr context_,
StorageDistributed & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const ASTPtr & query_ast_,
const ClusterPtr & cluster_,
bool insert_sync_,
UInt64 insert_timeout_,
StorageID main_table_)
StorageID main_table_,
const Names & columns_to_send_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, context(Context::createCopy(context_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, query_ast(query_ast_)
, query_string(queryToString(query_ast_))
, query_ast(createInsertToRemoteTableQuery(main_table_.database_name, main_table_.table_name, columns_to_send_))
, query_string(queryToString(query_ast))
, cluster(cluster_)
, insert_sync(insert_sync_)
, allow_materialized(context->getSettingsRef().insert_allow_materialized_columns)
, insert_timeout(insert_timeout_)
, main_table(main_table_)
, columns_to_send(columns_to_send_.begin(), columns_to_send_.end())
, log(&Poco::Logger::get("DistributedBlockOutputStream"))
{
const auto & settings = context->getSettingsRef();
@ -127,27 +143,25 @@ void DistributedSink::consume(Chunk chunk)
auto ordinary_block = getHeader().cloneWithColumns(chunk.detachColumns());
if (!allow_materialized)
{
/* They are added by the AddingDefaultBlockOutputStream, and we will get
* different number of columns eventually */
for (const auto & col : metadata_snapshot->getColumns().getMaterialized())
{
if (ordinary_block.has(col.name))
{
ordinary_block.erase(col.name);
LOG_DEBUG(log, "{}: column {} will be removed, because it is MATERIALIZED",
storage.getStorageID().getNameForLogs(), col.name);
}
}
}
if (insert_sync)
writeSync(ordinary_block);
else
writeAsync(ordinary_block);
}
Block DistributedSink::removeSuperfluousColumns(Block block) const
{
for (size_t i = block.columns(); i;)
{
--i;
if (!columns_to_send.contains(block.getByPosition(i).name))
block.erase(i);
}
return block;
}
void DistributedSink::writeAsync(const Block & block)
{
if (random_shard_insert)
@ -402,6 +416,8 @@ void DistributedSink::writeSync(const Block & block)
{
const Settings & settings = context->getSettingsRef();
const auto & shards_info = cluster->getShardsInfo();
Block block_to_send = removeSuperfluousColumns(block);
size_t start = 0;
size_t end = shards_info.size();
@ -414,7 +430,7 @@ void DistributedSink::writeSync(const Block & block)
if (!pool)
{
/// Deferred initialization. Only for sync insertion.
initWritingJobs(block, start, end);
initWritingJobs(block_to_send, start, end);
size_t jobs_count = random_shard_insert ? 1 : (remote_jobs_count + local_jobs_count);
size_t max_threads = std::min<size_t>(settings.max_distributed_connections, jobs_count);
@ -459,7 +475,7 @@ void DistributedSink::writeSync(const Block & block)
finished_jobs_count = 0;
for (size_t shard_index : collections::range(start, end))
for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs)
pool->scheduleOrThrowOnError(runWritingJob(job, block, num_shards));
pool->scheduleOrThrowOnError(runWritingJob(job, block_to_send, num_shards));
}
catch (...)
{
@ -584,12 +600,13 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
{
const auto & shard_info = cluster->getShardsInfo()[shard_id];
const auto & settings = context->getSettingsRef();
Block block_to_send = removeSuperfluousColumns(block);
if (shard_info.hasInternalReplication())
{
if (shard_info.isLocal() && settings.prefer_localhost_replica)
/// Prefer insert into current instance directly
writeToLocal(block, shard_info.getLocalNodeCount());
writeToLocal(block_to_send, shard_info.getLocalNodeCount());
else
{
const auto & path = shard_info.insertPathForInternalReplication(
@ -597,13 +614,13 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
settings.use_compact_format_in_distributed_parts_names);
if (path.empty())
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
writeToShard(block, {path});
writeToShard(block_to_send, {path});
}
}
else
{
if (shard_info.isLocal() && settings.prefer_localhost_replica)
writeToLocal(block, shard_info.getLocalNodeCount());
writeToLocal(block_to_send, shard_info.getLocalNodeCount());
std::vector<std::string> dir_names;
for (const auto & address : cluster->getShardsAddresses()[shard_id])
@ -611,7 +628,7 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
dir_names.push_back(address.toFullString(settings.use_compact_format_in_distributed_parts_names));
if (!dir_names.empty())
writeToShard(block, dir_names);
writeToShard(block_to_send, dir_names);
}
}

View File

@ -43,11 +43,11 @@ public:
ContextPtr context_,
StorageDistributed & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const ASTPtr & query_ast_,
const ClusterPtr & cluster_,
bool insert_sync_,
UInt64 insert_timeout_,
StorageID main_table_);
StorageID main_table_,
const Names & columns_to_send_);
String getName() const override { return "DistributedSink"; }
void consume(Chunk chunk) override;
@ -65,6 +65,9 @@ private:
void writeAsyncImpl(const Block & block, size_t shard_id = 0);
/// Removes columns which should not be sent to shards.
Block removeSuperfluousColumns(Block block) const;
/// Increments finished_writings_count after each repeat.
void writeToLocal(const Block & block, size_t repeats);
@ -84,7 +87,9 @@ private:
/// Returns the number of blocks was written for each cluster node. Uses during exception handling.
std::string getCurrentStateDescription();
/// Context used for writing to remote tables.
ContextMutablePtr context;
StorageDistributed & storage;
StorageMetadataPtr metadata_snapshot;
ASTPtr query_ast;
@ -102,6 +107,7 @@ private:
/// Sync-related stuff
UInt64 insert_timeout; // in seconds
StorageID main_table;
NameSet columns_to_send;
Stopwatch watch;
Stopwatch watch_current_block;
std::optional<ThreadPool> pool;

View File

@ -159,23 +159,6 @@ ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, co
return modified_query_ast;
}
/// The columns list in the original INSERT query is incorrect because inserted blocks are transformed
/// to the form of the sample block of the Distributed table. So we rewrite it and add all columns from
/// the sample block instead.
ASTPtr createInsertToRemoteTableQuery(const std::string & database, const std::string & table, const Block & sample_block)
{
auto query = std::make_shared<ASTInsertQuery>();
query->table_id = StorageID(database, table);
auto columns = std::make_shared<ASTExpressionList>();
query->columns = columns;
query->children.push_back(columns);
for (const auto & col : sample_block)
columns->children.push_back(std::make_shared<ASTIdentifier>(col.name));
return query;
}
/// Calculate maximum number in file names in directory and all subdirectories.
/// To ensure global order of data blocks yet to be sent across server restarts.
UInt64 getMaximumFileNumber(const std::string & dir_path)
@ -682,17 +665,16 @@ SinkToStoragePtr StorageDistributed::write(const ASTPtr &, const StorageMetadata
bool insert_sync = settings.insert_distributed_sync || settings.insert_shard_id || owned_cluster;
auto timeout = settings.insert_distributed_timeout;
Block sample_block;
if (!settings.insert_allow_materialized_columns)
sample_block = metadata_snapshot->getSampleBlockNonMaterialized();
Names columns_to_send;
if (settings.insert_allow_materialized_columns)
columns_to_send = metadata_snapshot->getSampleBlock().getNames();
else
sample_block = metadata_snapshot->getSampleBlock();
columns_to_send = metadata_snapshot->getSampleBlockNonMaterialized().getNames();
/// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster
return std::make_shared<DistributedSink>(
local_context, *this, metadata_snapshot,
createInsertToRemoteTableQuery(remote_database, remote_table, sample_block),
cluster, insert_sync, timeout, StorageID{remote_database, remote_table});
local_context, *this, metadata_snapshot, cluster, insert_sync, timeout,
StorageID{remote_database, remote_table}, columns_to_send);
}