This commit is contained in:
Michael Kolupaev 2014-07-14 18:07:47 +04:00
parent 62de3bc73a
commit 33955ee7dd
6 changed files with 166 additions and 11 deletions

View File

@ -112,6 +112,30 @@ public:
std::sort(vector.begin(), vector.end());
return (std::unique(vector.begin(), vector.end()) - vector.begin()) * 2 - size() - rhs.size();
}
Names getNames() const
{
Names res;
res.reserve(size);
for (const NameAndTypePair & column : *this)
{
res.push_back(column.name);
}
return res;
}
/// Оставить только столбцы, имена которых есть в names. В names могут быть лишние столбцы.
NamesAndTypesList intersect(const Names & names)
{
std::set<String> name_set(names.begin(), names.end());
NamesAndTypesList res;
for (const NameAndTypePair & column : *this)
{
if (name_set.count(column.name))
res.push_back(column);
}
return res;
}
};
typedef SharedPtr<NamesAndTypesList> NamesAndTypesListPtr;

View File

@ -55,6 +55,14 @@ public:
*/
void check(const Names & column_names) const;
/** Проверить, что все запрошенные имена есть в таблице и имеют правильные типы.
*/
void check(const NamesAndTypesList & columns) const;
/** Проверить, что все имена из пересечения names и columns есть в таблице и имеют одинаковые типы.
*/
void check(const NamesAndTypesList & columns, const Names & column_names) const;
/** Проверить, что блок с данными для записи содержит все столбцы таблицы с правильными типами,
* содержит только столбцы таблицы, и все столбцы различны.
* Если need_all, еще проверяет, что все столбцы таблицы есть в блоке.

View File

@ -21,11 +21,16 @@ public:
:
path(path_), block_size(block_size_), column_names(column_names_),
storage(storage_), owned_data_part(owned_data_part_),
part_columns_lock(new Poco::ScopedReadRWLock(owned_data_part->columns_lock)),
all_mark_ranges(mark_ranges_), remaining_mark_ranges(mark_ranges_),
use_uncompressed_cache(use_uncompressed_cache_),
prewhere_actions(prewhere_actions_), prewhere_column(prewhere_column_),
log(&Logger::get("MergeTreeBlockInputStream"))
{
/// Под owned_data_part->columns_lock проверим, что все запрошенные столбцы в куске того же типа, что в таблице.
/// Это может быть не так во время ALTER MODIFY.
storage.check(owned_data_part->columns, column_names);
std::reverse(remaining_mark_ranges.begin(), remaining_mark_ranges.end());
if (prewhere_actions)
@ -246,6 +251,9 @@ protected:
* буферы не висели в памяти.
*/
reader.reset();
pre_reader.reset();
part_columns_lock.reset();
owned_data_part.reset();
}
return res;
@ -259,6 +267,7 @@ private:
Names pre_column_names;
MergeTreeData & storage;
const MergeTreeData::DataPartPtr owned_data_part; /// Кусок не будет удалён, пока им владеет этот объект.
std::unique_ptr<Poco::ScopedReadRWLock> part_columns_lock; /// Не дадим изменить список столбцов куска, пока мы из него читаем.
MarkRanges all_mark_ranges; /// В каких диапазонах засечек читать. В порядке возрастания номеров.
MarkRanges remaining_mark_ranges; /// В каких диапазонах засечек еще не прочли.
/// В порядке убывания номеров, чтобы можно было выбрасывать из конца.

View File

@ -223,8 +223,22 @@ public:
/// Описание столбцов.
NamesAndTypesList columns;
/** Блокируется на запись при изменении columns, checksums или любых файлов куска.
* Блокируется на чтение при чтении columns, checksums или любых файлов куска.
*/
Poco::RWLock columns_lock;
/** Берется на все время ALTER куска: от начала записи временных фалов до их переименования в постоянные.
* Берется при разлоченном columns_lock.
*
* NOTE: "Можно" было бы обойтись без этого мьютекса, если бы можно было превращать ReadRWLock в WriteRWLock, не снимая блокировку.
* Такое превращение невозможно, потому что создало бы дедлок, если делать его из двух потоков сразу.
* Взятие этого мьютекса означает, что мы хотим заблокировать columns_lock на чтение с намерением потом, не
* снимая блокировку, заблокировать его на запись.
*/
Poco::FastMutex alter_mutex;
/// NOTE можно загружать засечки тоже в оперативку
/// Вычисляем сумарный размер всей директории со всеми файлами
@ -440,9 +454,19 @@ public:
private:
friend class MergeTreeData;
AlterDataPartTransaction(DataPartPtr data_part_) : data_part(data_part_) {}
AlterDataPartTransaction(DataPartPtr data_part_) : data_part(data_part_), alter_lock(data_part->alter_mutex) {}
void clear()
{
alter_lock.unlock();
data_part = nullptr;
}
DataPartPtr data_part;
Poco::ScopedLockWithUnlock alter_lock;
DataPart::Checksums new_checksums;
NamesAndTypesList new_columns;
/// Если значение - пустая строка, файл нужно удалить, и он не временный.
NameToNameMap rename_map;
};

View File

@ -138,6 +138,71 @@ void ITableDeclaration::check(const Names & column_names) const
}
void ITableDeclaration::check(const NamesAndTypesList & columns) const
{
const NamesAndTypesList & available_columns = getColumnsList();
const NamesAndTypesMap & columns_map = getColumnsMap(available_columns);
typedef google::dense_hash_set<StringRef, StringRefHash> UniqueStrings;
UniqueStrings unique_names;
unique_names.set_empty_key(StringRef());
for (const NameAndTypePair & column : columns)
{
NamesAndTypesMap::const_iterator it = columns_map.find(column.name);
if (columns_map.end() == it)
throw Exception("There is no column with name " + column.name + ". There are columns: "
+ listOfColumns(available_columns), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
if (column.type->getName() != it->second->getName())
throw Exception("Type mismatch for column " + column.name + ". Column has type "
+ it->second->getName() + ", got type " + column.type->getName(), ErrorCodes::TYPE_MISMATCH);
if (unique_names.end() != unique_names.find(column.name))
throw Exception("Column " + column.name + " queried more than once",
ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE);
unique_names.insert(column.name);
}
}
void ITableDeclaration::check(const NamesAndTypesList & columns, const Names & column_names) const
{
if (column_names.empty())
throw Exception("Empty list of columns queried. There are columns: " + listOfColumns(available_columns),
ErrorCodes::EMPTY_LIST_OF_COLUMNS_QUERIED);
const NamesAndTypesList & available_columns = getColumnsList();
const NamesAndTypesMap & available_columns_map = getColumnsMap(available_columns);
const NamesAndTypesMap & provided_columns_map = getColumnsMap(columns);
typedef google::dense_hash_set<StringRef, StringRefHash> UniqueStrings;
UniqueStrings unique_names;
unique_names.set_empty_key(StringRef());
for (const String & name : column_names)
{
NamesAndTypesMap::const_iterator it = provided_columns_map.find(name);
if (provided_columns_map.end() == it)
continue;
NamesAndTypesMap::const_iterator jt = available_columns_map.find(name);
if (available_columns_map.end() == jt)
throw Exception("There is no column with name " + name + ". There are columns: "
+ listOfColumns(available_columns), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
if (it->second->getName() != jt->second->getName())
throw Exception("Type mismatch for column " + name + ". Column has type "
+ jt->second->getName() + ", got type " + it->second->getName(), ErrorCodes::TYPE_MISMATCH);
if (unique_names.end() != unique_names.find(name))
throw Exception("Column " + name + " queried more than once",
ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE);
unique_names.insert(name);
}
}
void ITableDeclaration::check(const Block & block, bool need_all) const
{
const NamesAndTypesList & available_columns = getColumnsList();

View File

@ -423,11 +423,11 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(DataPart
{
ExpressionActionsPtr expression;
AlterDataPartTransactionPtr transaction(new AlterDataPartTransaction(part));
createConvertExpression(part, *columns, new_columns, expression, transaction->rename_map); // TODO: part->columns
createConvertExpression(part, part->columns, new_columns, expression, transaction->rename_map);
if (transaction->rename_map.empty())
{
transaction->data_part = nullptr;
transaction->clear();
return transaction;
}
@ -468,11 +468,20 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(DataPart
/// Запишем обновленные контрольные суммы во временный файл
if (!part->checksums.empty())
{
transaction->new_checksums = new_checksums;
WriteBufferFromFile checksums_file(full_path + part->name + "/checksums.txt.tmp", 4096);
new_checksums.writeText(checksums_file);
transaction->rename_map["checksums.txt.tmp"] = "checksums.txt";
}
/// Запишем обновленный список столбцов во временный файл.
{
transaction->new_columns = new_columns.intersect(part->columns.getNames());
WriteBufferFromFile columns_file(full_path + part->name + "/columns.txt.tmp", 4096);
transaction->new_columns.writeText(columns_file);
transaction->rename_map["columns.txt.tmp"] = "columns.txt";
}
return transaction;
}
@ -482,25 +491,41 @@ void MergeTreeData::AlterDataPartTransaction::commit()
return;
try
{
Poco::ScopedWriteRWLock lock(data_part->columns_lock);
String path = data_part->storage.full_path + data_part->name + "/";
/// 1) Переименуем старые файлы.
for (auto it : rename_map)
{
if (it.second.empty())
{
Poco::File(path + it.first).renameTo(path + it.first + ".removing");
Poco::File(path + it.first + ".removing").remove();
}
else
String name = it.second.empty() ? it.first : it.second;
Poco::File(path + name).renameTo(path + name + ".tmp2");
}
/// 2) Переместим на их место новые.
for (auto it : rename_map)
{
if (!it.second.empty())
{
Poco::File(path + it.first).renameTo(path + it.second);
}
}
data_part = nullptr;
data_part->checksums = new_checksums;
data_part->columns = new_columns;
/// 3) Удалим старые файлы.
for (auto it : rename_map)
{
String name = it.second.empty() ? it.first : it.second;
Poco::File(path + name + ".tmp2").remove();
}
clear();
}
catch (...)
{
/// Если что-то пошло не так, не будем удалять временные файлы в деструкторе.
data_part = nullptr;
clear();
throw;
}
}