More general

This commit is contained in:
alesapin 2019-08-07 18:21:45 +03:00
parent 5177309001
commit 13e4581317
6 changed files with 93 additions and 24 deletions

View File

@ -1,6 +1,8 @@
#include <Storages/IStorage.h>
#include <Storages/AlterCommands.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <sparsehash/dense_hash_map>
#include <sparsehash/dense_hash_set>
@ -19,6 +21,7 @@ namespace ErrorCodes
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
extern const int TYPE_MISMATCH;
extern const int SETTINGS_ARE_NOT_SUPPORTED;
extern const int UNKNOWN_SETTING;
}
IStorage::IStorage(ColumnsDescription columns_)
@ -295,7 +298,9 @@ bool IStorage::isVirtualColumn(const String & column_name) const
bool IStorage::hasSetting(const String & /* setting_name */) const
{
if (!supportsSettings())
throw Exception("Storage '" + getName() + "' doesn't support settings.", ErrorCodes::SETTINGS_ARE_NOT_SUPPORTED);
return false;
}
TableStructureReadLockHolder IStorage::lockStructureForShare(bool will_add_new_data, const String & query_id)
@ -352,6 +357,39 @@ TableStructureWriteLockHolder IStorage::lockExclusively(const String & query_id)
return result;
}
void IStorage::alterSettings(
const SettingsChanges & new_changes,
const String & current_database_name,
const String & current_table_name,
const Context & context,
TableStructureWriteLockHolder & /* table_lock_holder */)
{
IDatabase::ASTModifier storage_modifier = [&] (IAST & ast)
{
if (!new_changes.empty())
{
auto & storage_changes = ast.as<ASTStorage &>().settings->changes;
/// Make storage settings unique
for (const auto & change : new_changes)
{
if (hasSetting(change.name))
{
auto finder = [&change] (const SettingChange & c) { return c.name == change.name;};
if (auto it = std::find_if(storage_changes.begin(), storage_changes.end(), finder); it != storage_changes.end())
it->value = change.value;
else
storage_changes.push_back(change);
}
else
throw Exception{"Storage '" + getName() + "' doesn't have setting '" + change.name + "'", ErrorCodes::UNKNOWN_SETTING};
}
}
};
context.getDatabase(current_database_name)->alterTable(context, current_table_name, getColumns(), getIndices(), storage_modifier);
}
void IStorage::alter(
const AlterCommands & params,
const String & database_name,
@ -359,12 +397,17 @@ void IStorage::alter(
const Context & context,
TableStructureWriteLockHolder & table_lock_holder)
{
for (const auto & param : params)
if (params.isSettingsAlter())
{
if (param.isMutable())
throw Exception("Method alter supports only change comment of column for storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
SettingsChanges new_changes;
params.applyForSettingsOnly(new_changes);
alterSettings(new_changes, database_name, table_name, context, table_lock_holder);
return;
}
if (params.isMutable())
throw Exception("Method alter supports only change comment of column for storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
auto new_columns = getColumns();
auto new_indices = getIndices();

View File

@ -12,6 +12,7 @@
#include <Common/ActionLock.h>
#include <Common/Exception.h>
#include <Common/RWLock.h>
#include <Common/SettingsChanges.h>
#include <optional>
#include <shared_mutex>
@ -95,6 +96,9 @@ public:
/// Returns true if the storage supports deduplication of inserted data blocks.
virtual bool supportsDeduplication() const { return false; }
/// Returns true if the storage supports settings.
virtual bool supportsSettings() const { return false; }
/// Optional size information of each physical column.
/// Currently it's only used by the MergeTree family for query optimizations.
using ColumnSizeByName = std::unordered_map<std::string, ColumnSize>;
@ -252,6 +256,15 @@ public:
throw Exception("Partition operations are not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** ALTER table settings if possible. Otherwise throws exception.
*/
virtual void alterSettings(
const SettingsChanges & new_changes,
const String & current_database_name,
const String & current_table_name,
const Context & context,
TableStructureWriteLockHolder & table_lock_holder);
/** Perform any background work. For example, combining parts in a MergeTree type table.
* Returns whether any work has been done.
*/

View File

@ -40,6 +40,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int UNSUPPORTED_METHOD;
}
namespace
@ -388,6 +389,22 @@ bool StorageKafka::streamToViews()
}
bool StorageKafka::hasSetting(const String & setting_name) const
{
return KafkaSettings::findIndex(setting_name) != KafkaSettings::npos;
}
void StorageKafka::alterSettings(
const SettingsChanges & /* new_changes */,
const String & /* current_database_name */,
const String & /* current_table_name */,
const Context & /* context */,
TableStructureWriteLockHolder & /* table_lock_holder */)
{
throw Exception("Storage '" + getName() + "' doesn't support settings alter", ErrorCodes::UNSUPPORTED_METHOD);
}
void registerStorageKafka(StorageFactory & factory)
{
factory.registerStorage("Kafka", [](const StorageFactory::Arguments & args)

View File

@ -24,6 +24,7 @@ public:
std::string getName() const override { return "Kafka"; }
std::string getTableName() const override { return table_name; }
std::string getDatabaseName() const override { return database_name; }
bool supportsSettings() const override { return true; }
void startup() override;
void shutdown() override;
@ -50,6 +51,15 @@ public:
const auto & getSchemaName() const { return schema_name; }
const auto & skipBroken() const { return skip_broken; }
bool hasSetting(const String & setting_name) const override;
void alterSettings(
const SettingsChanges & new_changes,
const String & current_database_name,
const String & current_table_name,
const Context & context,
TableStructureWriteLockHolder & table_lock_holder) override;
protected:
StorageKafka(
const std::string & table_name_,

View File

@ -1588,26 +1588,10 @@ void MergeTreeData::alterSettings(
const String & current_database_name,
const String & current_table_name,
const Context & context,
TableStructureWriteLockHolder & /* table_lock_holder */)
TableStructureWriteLockHolder & table_lock_holder)
{
settings.updateFromChanges(new_changes);
IDatabase::ASTModifier storage_modifier = [&] (IAST & ast)
{
if (!new_changes.empty())
{
auto & storage_changes = ast.as<ASTStorage &>().settings->changes;
/// Make storage settings unique
for (const auto & change : new_changes)
{
auto finder = [&change] (const SettingChange & c) { return c.name == change.name;};
if (auto it = std::find_if(storage_changes.begin(), storage_changes.end(), finder); it != storage_changes.end())
it->value = change.value;
else
storage_changes.push_back(change);
}
}
};
context.getDatabase(current_database_name)->alterTable(context, current_table_name, getColumns(), getIndices(), storage_modifier);
IStorage::alterSettings(new_changes, current_database_name, current_table_name, context, table_lock_holder);
}
bool MergeTreeData::hasSetting(const String & setting_name) const

View File

@ -342,6 +342,8 @@ public:
|| merging_params.mode == MergingParams::VersionedCollapsing;
}
bool supportsSettings() const override { return true; }
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context &) const override;
NameAndTypePair getColumn(const String & column_name) const override
@ -516,7 +518,7 @@ public:
const String & current_database_name,
const String & current_table_name,
const Context & context,
TableStructureWriteLockHolder & table_lock_holder);
TableStructureWriteLockHolder & table_lock_holder) override;
/// All MergeTreeData children have settings.
bool hasSetting(const String & setting_name) const override;