Real fix of test.

This commit is contained in:
Vladimir Chebotarev 2020-07-23 17:10:48 +03:00
parent f5af64514f
commit 1b3f5c99f5
3 changed files with 27 additions and 35 deletions

View File

@ -561,12 +561,15 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
/// and keep monitor thread out from reading incomplete data
std::string first_file_tmp_path{};
const auto & [disk, data_path] = storage.getPath();
auto reservation = storage.getStoragePolicy()->reserve(block.bytes());
auto disk = reservation->getDisk()->getPath();
auto data_path = storage.getRelativeDataPath();
auto it = dir_names.begin();
/// on first iteration write block to a temporary directory for subsequent
/// hardlinking to ensure the inode is not freed until we're done
{
const std::string path(disk + data_path + *it);
Poco::File(path).createDirectory();

View File

@ -290,7 +290,7 @@ StorageDistributed::StorageDistributed(
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & storage_policy_,
const String & storage_policy_name_,
const String & relative_data_path_,
bool attach_)
: IStorage(id_)
@ -300,7 +300,6 @@ StorageDistributed::StorageDistributed(
, log(&Poco::Logger::get("StorageDistributed (" + id_.table_name + ")"))
, cluster_name(global_context->getMacros()->expand(cluster_name_))
, has_sharding_key(sharding_key_)
, storage_policy(storage_policy_)
, relative_data_path(relative_data_path_)
{
StorageInMemoryMetadata storage_metadata;
@ -316,7 +315,11 @@ StorageDistributed::StorageDistributed(
}
if (!relative_data_path.empty())
createStorage();
{
storage_policy = global_context->getStoragePolicy(storage_policy_name_);
if (storage_policy->getVolumes().size() != 1)
throw Exception("Storage policy for Distributed table, should have exactly one volume", ErrorCodes::BAD_ARGUMENTS);
}
/// Sanity check. Skip check if the table is already created to allow the server to start.
if (!attach_ && !cluster_name.empty())
@ -336,22 +339,14 @@ StorageDistributed::StorageDistributed(
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & storage_policy_,
const String & storage_policy_name_,
const String & relative_data_path_,
bool attach)
: StorageDistributed(id_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, storage_policy_, relative_data_path_, attach)
: StorageDistributed(id_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, storage_policy_name_, relative_data_path_, attach)
{
remote_table_function_ptr = std::move(remote_table_function_ptr_);
}
void StorageDistributed::createStorage()
{
/// Create default policy with the relative_data_path_
auto policy = global_context->getStoragePolicy(storage_policy);
if (policy->getVolumes().size() != 1)
throw Exception("Storage policy for Distributed table, should have exactly one volume", ErrorCodes::BAD_ARGUMENTS);
volume = policy->getVolume(0);
}
StoragePtr StorageDistributed::createWithOwnCluster(
const StorageID & table_id_,
@ -527,7 +522,7 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMeta
const auto & settings = context.getSettingsRef();
/// Ban an attempt to make async insert into the table belonging to DatabaseMemory
if (!volume && !owned_cluster && !settings.insert_distributed_sync)
if (!storage_policy && !owned_cluster && !settings.insert_distributed_sync)
{
throw Exception("Storage " + getName() + " must has own data directory to enable asynchronous inserts",
ErrorCodes::BAD_ARGUMENTS);
@ -583,10 +578,10 @@ void StorageDistributed::startup()
if (remote_database.empty() && !remote_table_function_ptr)
LOG_WARNING(log, "Name of remote database is empty. Default database will be used implicitly.");
if (!volume)
if (!storage_policy)
return;
for (const DiskPtr & disk : volume->getDisks())
for (const DiskPtr & disk : storage_policy->getDisks())
createDirectoryMonitors(disk->getPath());
for (const String & path : getDataPaths())
@ -620,7 +615,7 @@ void StorageDistributed::drop()
LOG_DEBUG(log, "Removing pending blocks for async INSERT from filesystem on DROP TABLE");
auto disks = volume->getDisks();
auto disks = storage_policy->getDisks();
for (const auto & disk : disks)
disk->removeRecursive(relative_data_path);
@ -634,7 +629,7 @@ Strings StorageDistributed::getDataPaths() const
if (relative_data_path.empty())
return paths;
for (const DiskPtr & disk : volume->getDisks())
for (const DiskPtr & disk : storage_policy->getDisks())
paths.push_back(disk->getPath() + relative_data_path);
return paths;
@ -657,9 +652,7 @@ void StorageDistributed::truncate(const ASTPtr &, const StorageMetadataPtr &, co
StoragePolicyPtr StorageDistributed::getStoragePolicy() const
{
if (storage_policy.empty())
return {};
return global_context->getStoragePolicy(storage_policy);
return storage_policy;
}
void StorageDistributed::createDirectoryMonitors(const std::string & disk)
@ -706,11 +699,6 @@ size_t StorageDistributed::getShardCount() const
return getCluster()->getShardCount();
}
std::pair<const std::string &, const std::string &> StorageDistributed::getPath()
{
return {volume->getDisk()->getPath(), relative_data_path};
}
ClusterPtr StorageDistributed::getCluster() const
{
return owned_cluster ? owned_cluster : global_context->getCluster(cluster_name);
@ -856,10 +844,13 @@ void StorageDistributed::rename(const String & new_path_to_table_data, const Sto
renameOnDisk(new_path_to_table_data);
renameInMemory(new_table_id);
}
void StorageDistributed::renameOnDisk(const String & new_path_to_table_data)
{
for (const DiskPtr & disk : volume->getDisks())
for (const DiskPtr & disk : storage_policy->getDisks())
{
disk->reserve(0);
const String path(disk->getPath());
auto new_path = path + new_path_to_table_data;
Poco::File(path + relative_data_path).renameTo(new_path);

View File

@ -102,7 +102,7 @@ public:
const ExpressionActionsPtr & getShardingKeyExpr() const { return sharding_key_expr; }
const String & getShardingKeyColumnName() const { return sharding_key_column_name; }
size_t getShardCount() const;
std::pair<const std::string &, const std::string &> getPath();
const String & getRelativeDataPath() const { return relative_data_path; }
std::string getRemoteDatabaseName() const { return remote_database; }
std::string getRemoteTableName() const { return remote_table; }
std::string getClusterName() const { return cluster_name; } /// Returns empty string if tables is used by TableFunctionRemote
@ -163,7 +163,7 @@ protected:
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & storage_policy_,
const String & storage_policy_name_,
const String & relative_data_path_,
bool attach_);
@ -175,16 +175,14 @@ protected:
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & storage_policy_,
const String & storage_policy_name_,
const String & relative_data_path_,
bool attach);
void createStorage();
String storage_policy;
String relative_data_path;
/// Can be empty if relative_data_path is empty. In this case, a directory for the data to be sent is not created.
VolumePtr volume;
StoragePolicyPtr storage_policy;
struct ClusterNodeData
{