This commit is contained in:
Andrey Mironov 2014-10-23 16:16:24 +04:00
parent 847b91614a
commit 934149d59a
3 changed files with 241 additions and 149 deletions

View File

@ -2,6 +2,9 @@
#include <DB/Parsers/IAST.h>
#include <DB/Parsers/formatAST.h>
#include <DB/Parsers/ExpressionListParsers.h>
#include <DB/IO/WriteBufferFromString.h>
#include <statdaemons/ext/range.hpp>
#include <unordered_map>
namespace DB
@ -67,7 +70,67 @@ namespace DB
using std::unordered_map<String, ColumnDefault>::unordered_map;
/// @todo implement (de)serialization
String toString() const { return {}; }
static ColumnDefaults parse(const String & str) { return {}; }
String toString() const
{
String s;
WriteBufferFromString buf{s};
writeString("column defaults format version: 1\n", buf);
writeText(size(), buf);
writeString(" columns:\n", buf);
for (const auto & column_default : *this)
{
writeBackQuotedString(column_default.first, buf);
writeChar(' ', buf);
writeString(DB::toString(column_default.second.type), buf);
writeChar('\t', buf);
writeString(queryToString(column_default.second.expression), buf);
writeChar('\n', buf);
}
return s;
}
static ColumnDefaults parse(const String & str) {
ReadBufferFromString buf{str};
ColumnDefaults defaults{};
assertString("column defaults format version: 1\n", buf);
size_t count{};
readText(count, buf);
assertString(" columns:\n", buf);
ParserTernaryOperatorExpression expr_parser;
for (size_t i = 0; i < count; ++i)
{
String column_name;
readBackQuotedString(column_name, buf);
assertString(" ", buf);
String default_type_str;
readString(default_type_str, buf);
const auto default_type = columnDefaultTypeFromString(default_type_str);
assertString("\t", buf);
String default_expr_str;
readText(default_expr_str, buf);
assertString("\n", buf);
ASTPtr default_expr;
Expected expected{};
auto begin = default_expr_str.data();
const auto end = begin + default_expr_str.size();
if (!expr_parser.parse(begin, end, default_expr, expected))
throw Exception{"Could not parse default expression", DB::ErrorCodes::CANNOT_PARSE_TEXT};
defaults.emplace(column_name, ColumnDefault{default_type, default_expr});
}
assertEOF(buf);
return defaults;
}
};
}

View File

@ -1619,11 +1619,13 @@ void StorageReplicatedMergeTree::alterThread()
&stat, alter_thread_event);
const String alias_columns_str = zookeeper->get(zookeeper_path + "/alias_columns",
&stat, alter_thread_event);
const String column_defaults_str = zookeeper->get(zookeeper_path + "/column_defaults",
&stat, alter_thread_event);
NamesAndTypesList columns = NamesAndTypesList::parse(columns_str, context.getDataTypeFactory());
NamesAndTypesList materialized_columns = NamesAndTypesList::parse(
materialized_columns_str, context.getDataTypeFactory());
NamesAndTypesList alias_columns = NamesAndTypesList::parse(alias_columns_str, context.getDataTypeFactory());
ColumnDefaults column_defaults;
ColumnDefaults column_defaults = ColumnDefaults::parse(column_defaults_str);
bool changed_version = (stat.version != columns_version);
@ -1637,7 +1639,7 @@ void StorageReplicatedMergeTree::alterThread()
const auto columns_changed = columns != data.getColumnsListNonMaterialized();
const auto materialized_columns_changed = materialized_columns != data.materialized_columns;
const auto alias_columns_changed = alias_columns != data.alias_columns;
const auto column_defaults_changed = false;
const auto column_defaults_changed = column_defaults != data.column_defaults;
if (columns_changed || materialized_columns_changed || alias_columns_changed ||
column_defaults_changed)
@ -2444,9 +2446,18 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
while (!shutdown_called)
{
String replica_columns_str;
String replica_materialized_columns_str;
String replica_alias_columns_str;
String replica_column_defaults_str;
/// Реплику могли успеть удалить.
if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/columns", replica_columns_str, &stat))
if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/columns", replica_columns_str, &stat) ||
!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/materialized_columns",
replica_materialized_columns_str, &stat) ||
!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/alias_columns",
replica_alias_columns_str, &stat) ||
!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/column_defaults",
replica_column_defaults_str, &stat))
{
LOG_WARNING(log, replica << " was removed");
break;
@ -2454,12 +2465,24 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
int replica_columns_version = stat.version;
if (replica_columns_str == new_columns_str)
if (replica_columns_str == new_columns_str &&
replica_materialized_columns_str == new_materialized_columns_str &&
replica_alias_columns_str == new_alias_columns_str &&
replica_column_defaults_str == new_column_defaults_str)
break;
if (!zookeeper->exists(zookeeper_path + "/columns", &stat))
throw Exception(zookeeper_path + "/columns doesn't exist", ErrorCodes::NOT_FOUND_NODE);
if (!zookeeper->exists(zookeeper_path + "/materialized_columns", &stat))
throw Exception(zookeeper_path + "/materialized_columns doesn't exist", ErrorCodes::NOT_FOUND_NODE);
if (!zookeeper->exists(zookeeper_path + "/alias_columns", &stat))
throw Exception(zookeeper_path + "/alias_columns doesn't exist", ErrorCodes::NOT_FOUND_NODE);
if (!zookeeper->exists(zookeeper_path + "/column_defaults", &stat))
throw Exception(zookeeper_path + "/column_defaults doesn't exist", ErrorCodes::NOT_FOUND_NODE);
if (stat.version != new_columns_version)
{
LOG_WARNING(log, zookeeper_path + "/columns changed before ALTER finished; "
@ -2467,7 +2490,13 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
return;
}
if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/columns", &stat, alter_query_event))
if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/columns", &stat, alter_query_event) ||
!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/materialized_columns",
&stat, alter_query_event) ||
!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/alias_columns",
&stat, alter_query_event) ||
!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/column_defaults",
&stat, alter_query_event))
{
LOG_WARNING(log, replica << " was removed");
break;