This commit is contained in:
Alexey Milovidov 2014-07-29 02:31:05 +04:00
commit 2fd6dc4f2c
26 changed files with 390 additions and 90 deletions

View File

@ -238,7 +238,7 @@ public:
class ExtractAllImpl
{
private:
const OptimizedRegularExpression * re = nullptr;
Regexps::Pointer re;
OptimizedRegularExpression::MatchVec matches;
size_t capture;
@ -263,7 +263,7 @@ public:
throw Exception("Illegal column " + col->getName() + " of first argument of function " + getName() + ". Must be constant string.",
ErrorCodes::ILLEGAL_COLUMN);
re = &Regexps::get(col->getData());
re = Regexps::get(col->getData());
capture = re->getNumberOfSubpatterns() > 0 ? 1 : 0;
matches.resize(capture + 1);

View File

@ -3,6 +3,7 @@
#include <Poco/Mutex.h>
#include <statdaemons/OptimizedRegularExpression.h>
#include <statdaemons/stdext.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeString.h>
@ -13,6 +14,9 @@
#include <re2/re2.h>
#include <re2/stringpiece.h>
#include <mutex>
#include <stack>
namespace DB
{
@ -240,38 +244,80 @@ inline bool likePatternIsStrstr(const String & pattern, String & res)
}
struct Regexps
namespace Regexps
{
typedef std::map<String, OptimizedRegularExpression> KnownRegexps;
struct Holder;
struct Deleter;
static const OptimizedRegularExpression & get(const std::string & pattern)
using Regexp = OptimizedRegularExpressionImpl<false>;
using KnownRegexps = std::map<String, std::unique_ptr<Holder>>;
using Pointer = std::unique_ptr<Regexp, Deleter>;
/// Container for regular expressions with embedded mutex for safe addition and removal
struct Holder
{
/// В GCC thread safe statics.
static KnownRegexps known_regexps;
static Poco::FastMutex mutex;
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::mutex mutex;
std::stack<std::unique_ptr<Regexp>> stack;
KnownRegexps::const_iterator it = known_regexps.find(pattern);
if (known_regexps.end() == it)
it = known_regexps.insert(std::make_pair(pattern, OptimizedRegularExpression(pattern))).first;
/** Extracts and returns a pointer from the collection if it's not empty,
* creates a new one by calling provided f() otherwise.
*/
template <typename Factory> Pointer get(Factory && f);
};
return it->second;
/** Specialized deleter for std::unique_ptr.
* Returns underlying pointer back to holder thus reclaiming its ownership.
*/
struct Deleter
{
Holder * holder;
Deleter(Holder * holder = nullptr) : holder{holder} {}
void operator()(Regexp * owning_ptr) const
{
std::lock_guard<std::mutex> lock{holder->mutex};
holder->stack.emplace(owning_ptr);
}
static const OptimizedRegularExpression & getLike(const std::string & pattern)
};
template <typename Factory>
inline Pointer Holder::get(Factory && f)
{
/// В GCC thread safe statics.
static KnownRegexps known_regexps;
static Poco::FastMutex mutex;
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::lock_guard<std::mutex> lock{mutex};
KnownRegexps::const_iterator it = known_regexps.find(pattern);
if (known_regexps.end() == it)
it = known_regexps.insert(std::make_pair(pattern, OptimizedRegularExpression(likePatternToRegexp(pattern)))).first;
if (stack.empty())
return { f(), this };
return it->second;
auto regexp = stack.top().release();
stack.pop();
return { regexp, this };
}
};
template <bool like>
inline Regexp createRegexp(const std::string & pattern) { return pattern; }
template <>
inline Regexp createRegexp<true>(const std::string & pattern) { return likePatternToRegexp(pattern); }
template <bool like = false>
inline Pointer get(const std::string & pattern)
{
/// C++11 has thread-safe function-local statics on most modern compilers.
static KnownRegexps known_regexps;
static std::mutex mutex;
std::lock_guard<std::mutex> lock{mutex};
auto it = known_regexps.find(pattern);
if (known_regexps.end() == it)
it = known_regexps.emplace(pattern, stdext::make_unique<Holder>()).first;
return it->second->get([&pattern] {
return new Regexp{createRegexp<like>(pattern)};
});
}
}
/** like - использовать выражения LIKE, если true; использовать выражения re2, если false.
@ -325,17 +371,17 @@ struct MatchImpl
}
else
{
const OptimizedRegularExpression & regexp = like ? Regexps::getLike(pattern) : Regexps::get(pattern);
const auto & regexp = Regexps::get<like>(pattern);
size_t size = offsets.size();
for (size_t i = 0; i < size; ++i)
res[i] = revert ^ regexp.match(reinterpret_cast<const char *>(&data[i != 0 ? offsets[i - 1] : 0]), (i != 0 ? offsets[i] - offsets[i - 1] : offsets[0]) - 1);
res[i] = revert ^ regexp->match(reinterpret_cast<const char *>(&data[i != 0 ? offsets[i - 1] : 0]), (i != 0 ? offsets[i] - offsets[i - 1] : offsets[0]) - 1);
}
}
static void constant(const std::string & data, const std::string & pattern, UInt8 & res)
{
const OptimizedRegularExpression & regexp = like ? Regexps::getLike(pattern) : Regexps::get(pattern);
res = revert ^ regexp.match(data);
const auto & regexp = Regexps::get<like>(pattern);
res = revert ^ regexp->match(data);
}
};
@ -349,9 +395,9 @@ struct ExtractImpl
res_data.reserve(data.size() / 5);
res_offsets.resize(offsets.size());
const OptimizedRegularExpression & regexp = Regexps::get(pattern);
const auto & regexp = Regexps::get(pattern);
unsigned capture = regexp.getNumberOfSubpatterns() > 0 ? 1 : 0;
unsigned capture = regexp->getNumberOfSubpatterns() > 0 ? 1 : 0;
OptimizedRegularExpression::MatchVec matches;
matches.reserve(capture + 1);
size_t prev_offset = 0;
@ -361,10 +407,10 @@ struct ExtractImpl
{
size_t cur_offset = offsets[i];
unsigned count = regexp.match(reinterpret_cast<const char *>(&data[prev_offset]), cur_offset - prev_offset - 1, matches, capture + 1);
unsigned count = regexp->match(reinterpret_cast<const char *>(&data[prev_offset]), cur_offset - prev_offset - 1, matches, capture + 1);
if (count > capture && matches[capture].offset != std::string::npos)
{
const OptimizedRegularExpression::Match & match = matches[capture];
const auto & match = matches[capture];
res_data.resize(res_offset + match.length + 1);
memcpy(&res_data[res_offset], &data[prev_offset + match.offset], match.length);
res_offset += match.length;
@ -462,7 +508,7 @@ struct ReplaceRegexpImpl
if (searcher.Match(input, start_pos, input.length(), re2::RE2::Anchor::UNANCHORED, matches, capture))
{
const re2::StringPiece & match = matches[0];
const auto & match = matches[0];
size_t char_to_copy = (match.data() - input.data()) - start_pos;
/// Копируем данные без изменения
@ -544,7 +590,7 @@ struct ReplaceRegexpImpl
if (searcher.Match(input, start_pos, input.length(), re2::RE2::Anchor::UNANCHORED, matches, capture))
{
const re2::StringPiece & match = matches[0];
const auto & match = matches[0];
size_t char_to_copy = (match.data() - input.data()) - start_pos;
/// Копируем данные без изменения
@ -618,7 +664,7 @@ struct ReplaceRegexpImpl
if (searcher.Match(input, start_pos, input.length(), re2::RE2::Anchor::UNANCHORED, matches, capture))
{
const re2::StringPiece & match = matches[0];
const auto & match = matches[0];
size_t char_to_copy = (match.data() - input.data()) - start_pos;
/// Копируем данные без изменения

View File

@ -191,7 +191,7 @@ public:
* В этой функции нужно переименовать директорию с данными, если она есть.
* Вызывается при заблокированной на запись структуре таблицы.
*/
virtual void rename(const String & new_path_to_db, const String & new_name)
virtual void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
{
throw Exception("Method rename is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}

View File

@ -10,6 +10,7 @@
#include <DB/IO/ReadBufferFromString.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/DataTypes/DataTypeString.h>
#include <Poco/RWLock.h>
@ -572,6 +573,18 @@ public:
const NamesAndTypesList & getColumnsList() const { return *columns; }
NameAndTypePair getColumn(const String &column_name) const
{
if (column_name == "_part") return NameAndTypePair("_part", new DataTypeString);
return getRealColumn(column_name);
}
bool hasColumn(const String &column_name) const
{
if (column_name == "_part") return true;
return hasRealColumn(column_name);
}
String getFullPath() const { return full_path; }
String getLogName() const { return log_name; }
@ -615,8 +628,9 @@ public:
void replaceParts(const DataPartsVector & remove, const DataPartsVector & add, bool clear_without_timeout);
/** Переименовывает кусок в prefix_кусок и убирает его из рабочего набора.
* Если restore_covered, добавляет в рабочий набор неактивные куски, слиянием которых получен удаляемый кусок.
*/
void renameAndDetachPart(DataPartPtr part, const String & prefix);
void renameAndDetachPart(DataPartPtr part, const String & prefix, bool restore_covered = false);
/** Возвращает старые неактуальные куски, которые можно удалить. Одновременно удаляет их из списка кусков, но не с диска.
*/
@ -639,7 +653,7 @@ public:
* Сбрасывает кеши разжатых блоков и засечек.
* Нужно вызывать под залоченным lockStructureForAlter().
*/
void setPath(const String & full_path);
void setPath(const String & full_path, bool move_data);
/* Проверить, что такой ALTER можно выполнить:
* - Есть все нужные столбцы.

View File

@ -55,7 +55,8 @@ private:
size_t max_block_size,
bool use_uncompressed_cache,
ExpressionActionsPtr prewhere_actions,
const String & prewhere_column);
const String & prewhere_column,
bool add_virtual_column);
BlockInputStreams spreadMarkRangesAmongThreadsFinal(
RangesInDataParts parts,
@ -64,7 +65,8 @@ private:
size_t max_block_size,
bool use_uncompressed_cache,
ExpressionActionsPtr prewhere_actions,
const String & prewhere_column);
const String & prewhere_column,
bool add_virtual_column);
/// Создать выражение "Sign == 1".
void createPositiveSignCondition(ExpressionActionsPtr & out_expression, String & out_column);

View File

@ -18,6 +18,7 @@ public:
void write(const Block & block) override
{
auto part_blocks = storage.writer.splitBlockIntoParts(block);
for (auto & current_block : part_blocks)
{
storage.data.delayInsertIfNeeded();

View File

@ -62,7 +62,7 @@ public:
}
/// Переименование испортило бы целостность количества ссылок из таблиц ChunkRef.
void rename(const String & new_path_to_db, const String & new_name)
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
{
throw Exception("Table doesn't support renaming", ErrorCodes::NOT_IMPLEMENTED);
}

View File

@ -56,7 +56,7 @@ public:
unsigned threads = 1);
void drop() override {}
void rename(const String & new_path_to_db, const String & new_name) { name = new_name; }
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) { name = new_table_name; }
/// в подтаблицах добавлять и удалять столбы нужно вручную
/// структура подтаблиц не проверяется
void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context);

View File

@ -154,7 +154,7 @@ public:
BlockOutputStreamPtr write(
ASTPtr query);
void rename(const String & new_path_to_db, const String & new_name);
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name);
protected:
String path;

View File

@ -83,7 +83,7 @@ public:
ASTPtr query);
void drop() override;
void rename(const String & new_path_to_db, const String & new_name) { name = new_name; }
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) { name = new_table_name; }
private:
String name;

View File

@ -45,7 +45,7 @@ public:
unsigned threads = 1);
void drop() override {}
void rename(const String & new_path_to_db, const String & new_name) { name = new_name; }
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) { name = new_table_name; }
/// в подтаблицах добавлять и удалять столбы нужно вручную
/// структура подтаблиц не проверяется

View File

@ -50,6 +50,16 @@ public:
const NamesAndTypesList & getColumnsList() const { return data.getColumnsList(); }
NameAndTypePair getColumn(const String &column_name) const
{
return data.getColumn(column_name);
}
bool hasColumn(const String &column_name) const
{
return data.hasColumn(column_name);
}
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
@ -69,7 +79,7 @@ public:
void drop() override;
void rename(const String & new_path_to_db, const String & new_name);
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name);
void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context);

View File

@ -42,7 +42,7 @@ public:
return new NullBlockOutputStream;
}
void rename(const String & new_path_to_db, const String & new_name) { name = new_name; }
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) { name = new_table_name; }
private:
String name;

View File

@ -6,6 +6,7 @@
#include <DB/Storages/MergeTree/MergeTreeDataWriter.h>
#include <DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <zkutil/ZooKeeper.h>
#include <zkutil/LeaderElection.h>
#include <statdaemons/threadpool.hpp>
@ -50,6 +51,18 @@ public:
const NamesAndTypesList & getColumnsList() const override { return data.getColumnsList(); }
NameAndTypePair getColumn(const String &column_name) const
{
if (column_name == "_replicated") return NameAndTypePair("_replicated", new DataTypeUInt8);
return data.getColumn(column_name);
}
bool hasColumn(const String &column_name) const
{
if (column_name == "_replicated") return true;
return data.hasColumn(column_name);
}
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
@ -68,6 +81,8 @@ public:
*/
void drop() override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name);
bool supportsIndexForIn() const override { return true; }
/// Добавить кусок в очередь кусков, чьи данные нужно проверить в фоновом потоке.

View File

@ -127,7 +127,7 @@ public:
void drop() override;
void rename(const String & new_path_to_db, const String & new_name);
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name);
private:
String path;

View File

@ -145,7 +145,7 @@ static ASTPtr buildWhereExpression(const ASTs & functions)
BlockInputStreamPtr getVirtualColumnsBlocks(ASTPtr query, const Block & input, const Context & context)
{
const ASTSelectQuery & select = typeid_cast<ASTSelectQuery & >(*query);
if (!select.where_expression)
if (!select.where_expression && !select.prewhere_expression)
return new OneBlockInputStream(input);
ASTPtr new_query = new ASTSelectQuery();
@ -164,7 +164,10 @@ BlockInputStreamPtr getVirtualColumnsBlocks(ASTPtr query, const Block & input, c
select_list.children.push_back(new ASTIdentifier(StringRange(), columns[i]));
std::vector<ASTPtr> functions;
if (select.where_expression)
extractFunctions(select.where_expression, columns, functions);
if (select.prewhere_expression)
extractFunctions(select.prewhere_expression, columns, functions);
new_select.where_expression = buildWhereExpression(functions);
if (new_select.select_expression_list)

View File

@ -59,9 +59,9 @@ void InterpreterRenameQuery::execute()
context.assertTableDoesntExist(to_database_name, to_table_name);
/// Уведомляем таблицу о том, что она переименовается. Если таблица не поддерживает переименование - кинется исключение.
/// Уведомляем таблицу о том, что она переименовывается. Если таблица не поддерживает переименование - кинется исключение.
table->rename(path + "data/" + to_database_name_escaped + "/", to_table_name);
table->rename(path + "data/" + to_database_name_escaped + "/", to_database_name, to_table_name);
/// Пишем новый файл с метаданными.
{

View File

@ -5,6 +5,7 @@
#include <Yandex/ApplicationServerExt.h>
#include <statdaemons/ConfigProcessor.h>
#include <statdaemons/stdext.h>
#include <DB/Interpreters/loadMetadata.h>
#include <DB/Storages/StorageSystemNumbers.h>
@ -372,9 +373,7 @@ int Server::main(const std::vector<std::string> & args)
global_context->setDefaultReplicaName(config().getString("replica_name"));
std::string users_config_path = config().getString("users_config", config().getString("config-file", "config.xml"));
auto users_config_reloader = std::unique_ptr<UsersConfigReloader>{
new UsersConfigReloader(users_config_path, global_context.get())
};
auto users_config_reloader = stdext::make_unique<UsersConfigReloader>(users_config_path, global_context.get());
/// Максимальное количество одновременно выполняющихся запросов.
global_context->getProcessList().setMaxSize(config().getInt("max_concurrent_queries", 0));
@ -411,7 +410,7 @@ int Server::main(const std::vector<std::string> & args)
{
const auto profile_events_transmitter = config().getBool("use_graphite", true)
? std::unique_ptr<ProfileEventsTransmitter>{new ProfileEventsTransmitter{}}
? stdext::make_unique<ProfileEventsTransmitter>()
: nullptr;
bool use_olap_server = config().getBool("use_olap_http_server", false);

View File

@ -317,12 +317,16 @@ void MergeTreeData::clearOldParts()
}
}
void MergeTreeData::setPath(const String & new_full_path)
void MergeTreeData::setPath(const String & new_full_path, bool move_data)
{
if (move_data)
{
Poco::File(full_path).renameTo(new_full_path);
full_path = new_full_path;
/// Если данные перемещать не нужно, значит их переместил кто-то другой. Расчитываем, что он еще и сбросил кеши.
context.resetCaches();
}
full_path = new_full_path;
}
void MergeTreeData::dropAllData()
@ -716,14 +720,67 @@ void MergeTreeData::replaceParts(const DataPartsVector & remove, const DataParts
}
}
void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix)
void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix, bool restore_covered)
{
LOG_INFO(log, "Renaming " << part->name << " to " << prefix << part->name << " and detaching it.");
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
if (!all_data_parts.erase(part))
throw Exception("No such data part", ErrorCodes::NO_SUCH_DATA_PART);
data_parts.erase(part);
part->renameAddPrefix(prefix);
if (restore_covered)
{
auto it = all_data_parts.lower_bound(part);
Strings restored;
bool error = false;
UInt64 pos = part->left;
if (it != all_data_parts.begin())
{
--it;
if (part->contains(**it))
{
if ((*it)->left != part->left)
error = true;
data_parts.insert(*it);
pos = (*it)->right + 1;
restored.push_back((*it)->name);
}
else
error = true;
++it;
}
else
error = true;
for (; it != all_data_parts.end() && part->contains(**it); ++it)
{
if ((*it)->left < pos)
continue;
if ((*it)->left > pos)
error = true;
data_parts.insert(*it);
pos = (*it)->right + 1;
restored.push_back((*it)->name);
}
if (pos != part->right + 1)
error = true;
for (const String & name : restored)
{
LOG_INFO(log, "Activated part " << name);
}
if (error)
LOG_ERROR(log, "The set of parts restored in place of " << part->name << " looks incomplete. There might or might not be a data loss.");
}
}
MergeTreeData::DataParts MergeTreeData::getDataParts()

View File

@ -251,6 +251,10 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
{
LOG_DEBUG(log, "Merging " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name << " into " << merged_name);
String merged_dir = data.getFullPath() + merged_name;
if (Poco::File(merged_dir).exists())
throw Exception("Directory " + merged_dir + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
NameSet union_columns_set;
for (const MergeTreeData::DataPartPtr & part : parts)
{

View File

@ -6,6 +6,7 @@
#include <DB/DataStreams/ConcatBlockInputStream.h>
#include <DB/DataStreams/CollapsingFinalBlockInputStream.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/Common/VirtualColumnUtils.h>
namespace DB
{
@ -19,6 +20,19 @@ MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(MergeTreeData & data_)
}
/// Построить блок состоящий только из возможных значений виртуальных столбцов
static Block getBlockWithVirtualColumns(const MergeTreeData::DataPartsVector & parts)
{
Block res;
ColumnWithNameAndType _part(new ColumnString, new DataTypeString, "_part");
for (const auto & part : parts)
_part.column->insert(part->name);
res.insert(_part);
return res;
}
BlockInputStreams MergeTreeDataSelectExecutor::read(
const Names & column_names_to_return,
ASTPtr query,
@ -27,30 +41,60 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
size_t max_block_size,
unsigned threads)
{
data.check(column_names_to_return);
MergeTreeData::DataPartsVector parts;
{
auto parts_set = data.getDataParts();
parts.assign(parts_set.begin(), parts_set.end());
}
/// Если в запросе есть ограничения на виртуальный столбец _part, выберем только подходящие под него куски.
Names virt_column_names, real_column_names;
for (const auto & it : column_names_to_return)
if (it != "_part")
real_column_names.push_back(it);
else
virt_column_names.push_back(it);
Block virtual_columns_block = getBlockWithVirtualColumns(parts);
BlockInputStreamPtr virtual_columns;
/// Если запрошен хотя бы один виртуальный столбец, пробуем индексировать
if (!virt_column_names.empty())
virtual_columns = VirtualColumnUtils::getVirtualColumnsBlocks(query->clone(), virtual_columns_block, data.context);
else /// Иначе, считаем допустимыми все возможные значения
virtual_columns = new OneBlockInputStream(virtual_columns_block);
std::multiset<String> values = VirtualColumnUtils::extractSingleValueFromBlocks<String>(virtual_columns, "_part");
data.check(real_column_names);
processed_stage = QueryProcessingStage::FetchColumns;
PKCondition key_condition(query, data.context, data.getColumnsList(), data.getSortDescription());
PKCondition date_condition(query, data.context, data.getColumnsList(), SortDescription(1, SortColumnDescription(data.date_column_name, 1)));
MergeTreeData::DataPartsVector parts;
/// Выберем куски, в которых могут быть данные, удовлетворяющие date_condition.
/// Выберем куски, в которых могут быть данные, удовлетворяющие date_condition, и которые подходят под условие на _part.
{
MergeTreeData::DataParts data_parts = data.getDataParts();
auto prev_parts = parts;
parts.clear();
for (MergeTreeData::DataParts::iterator it = data_parts.begin(); it != data_parts.end(); ++it)
for (const auto & part : prev_parts)
{
Field left = static_cast<UInt64>((*it)->left_date);
Field right = static_cast<UInt64>((*it)->right_date);
if (values.find(part->name) == values.end())
continue;
if (date_condition.mayBeTrueInRange(&left, &right))
parts.push_back(*it);
Field left = static_cast<UInt64>(part->left_date);
Field right = static_cast<UInt64>(part->right_date);
if (!date_condition.mayBeTrueInRange(&left, &right))
continue;
parts.push_back(part);
}
}
/// Семплирование.
Names column_names_to_read = column_names_to_return;
Names column_names_to_read = real_column_names;
UInt64 sampling_column_value_limit = 0;
typedef Poco::SharedPtr<ASTFunction> ASTFunctionPtr;
ASTFunctionPtr filter_function;
@ -185,7 +229,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
max_block_size,
settings.use_uncompressed_cache,
prewhere_actions,
prewhere_column);
prewhere_column,
!virt_column_names.empty());
}
else
{
@ -196,7 +241,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
max_block_size,
settings.use_uncompressed_cache,
prewhere_actions,
prewhere_column);
prewhere_column,
!virt_column_names.empty());
}
if (select.sample_size)
@ -220,7 +266,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
size_t max_block_size,
bool use_uncompressed_cache,
ExpressionActionsPtr prewhere_actions,
const String & prewhere_column)
const String & prewhere_column,
bool add_virtual_column)
{
/// На всякий случай перемешаем куски.
std::random_shuffle(parts.begin(), parts.end());
@ -282,6 +329,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
data.getFullPath() + part.data_part->name + '/', max_block_size, column_names, data,
part.data_part, part.ranges, use_uncompressed_cache,
prewhere_actions, prewhere_column));
if (add_virtual_column)
streams.back() = new AddingConstColumnBlockInputStream<String>(
streams.back(), new DataTypeString, part.data_part->name, "_part");
need_marks -= marks_in_part;
parts.pop_back();
sum_marks_in_parts.pop_back();
@ -312,6 +362,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
data.getFullPath() + part.data_part->name + '/', max_block_size, column_names, data,
part.data_part, ranges_to_get_from_part, use_uncompressed_cache,
prewhere_actions, prewhere_column));
if (add_virtual_column)
streams.back() = new AddingConstColumnBlockInputStream<String>(
streams.back(), new DataTypeString, part.data_part->name, "_part");
}
if (streams.size() == 1)
@ -334,7 +387,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal
size_t max_block_size,
bool use_uncompressed_cache,
ExpressionActionsPtr prewhere_actions,
const String & prewhere_column)
const String & prewhere_column,
bool add_virtual_column)
{
size_t sum_marks = 0;
for (size_t i = 0; i < parts.size(); ++i)
@ -358,6 +412,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal
data.getFullPath() + part.data_part->name + '/', max_block_size, column_names, data,
part.data_part, part.ranges, use_uncompressed_cache,
prewhere_actions, prewhere_column);
if (add_virtual_column)
source_stream = new AddingConstColumnBlockInputStream<String>(
source_stream, new DataTypeString, part.data_part->name, "_part");
to_collapse.push_back(new ExpressionBlockInputStream(source_stream, data.getPrimaryExpression()));
}

View File

@ -523,15 +523,15 @@ size_t StorageLog::marksCount()
}
void StorageLog::rename(const String & new_path_to_db, const String & new_name)
void StorageLog::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
{
Poco::ScopedWriteRWLock lock(rwlock);
/// Переименовываем директорию с данными.
Poco::File(path + escapeForFileName(name)).renameTo(new_path_to_db + escapeForFileName(new_name));
Poco::File(path + escapeForFileName(name)).renameTo(new_path_to_db + escapeForFileName(new_table_name));
path = new_path_to_db;
name = new_name;
name = new_table_name;
for (Files_t::iterator it = files.begin(); it != files.end(); ++it)
{

View File

@ -89,14 +89,14 @@ void StorageMergeTree::drop()
data.dropAllData();
}
void StorageMergeTree::rename(const String & new_path_to_db, const String & new_name)
void StorageMergeTree::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
{
std::string new_full_path = new_path_to_db + escapeForFileName(new_name) + '/';
std::string new_full_path = new_path_to_db + escapeForFileName(new_table_name) + '/';
data.setPath(new_full_path);
data.setPath(new_full_path, true);
path = new_path_to_db;
name = new_name;
name = new_table_name;
full_path = new_full_path;
increment.setPath(full_path + "increment.txt");

View File

@ -6,6 +6,7 @@
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/IO/ReadBufferFromString.h>
#include <DB/Interpreters/InterpreterAlterQuery.h>
#include <DB/Common/VirtualColumnUtils.h>
#include <time.h>
namespace DB
@ -461,7 +462,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
for (MergeTreeData::DataPartPtr part : unexpected_parts)
{
LOG_ERROR(log, "Renaming unexpected part " << part->name << " to ignored_" + part->name);
data.renameAndDetachPart(part, "ignored_");
data.renameAndDetachPart(part, "ignored_", true);
}
}
@ -511,6 +512,12 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataP
checksums.checkEqual(part->checksums, true);
}
if (zookeeper->exists(replica_path + "/parts/" + part->name))
{
LOG_ERROR(log, "checkPartAndAddToZooKeeper: node " << replica_path + "/parts/" + part->name << " already exists");
return;
}
ops.push_back(new zkutil::Op::Check(
zookeeper_path + "/columns",
expected_columns_version));
@ -533,6 +540,8 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataP
void StorageReplicatedMergeTree::clearOldParts()
{
auto table_lock = lockStructure(false);
MergeTreeData::DataPartsVector parts = data.grabOldParts();
size_t count = parts.size();
@ -545,6 +554,8 @@ void StorageReplicatedMergeTree::clearOldParts()
{
MergeTreeData::DataPartPtr part = parts.back();
LOG_DEBUG(log, "Removing " << part->name);
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name + "/checksums", -1));
@ -832,7 +843,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
}
}
auto table_lock = lockStructure(true);
auto table_lock = lockStructure(false);
MergeTreeData::Transaction transaction;
MergeTreeData::DataPartPtr part = merger.mergeParts(parts, entry.new_part_name, &transaction);
@ -841,6 +852,10 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
checkPartAndAddToZooKeeper(part, ops);
zookeeper->multi(ops);
/** При ZCONNECTIONLOSS или ZOPERATIONTIMEOUT можем зря откатить локальные изменения кусков.
* Это не проблема, потому что в таком случае слияние останется в очереди, и мы попробуем снова.
*/
transaction.commit();
merge_selecting_event.set();
@ -1194,6 +1209,8 @@ void StorageReplicatedMergeTree::alterThread()
changed = true;
}
MergeTreeData::DataParts parts;
/// Если описание столбцов изменилось, обновим структуру таблицы локально.
if (changed)
{
@ -1207,6 +1224,9 @@ void StorageReplicatedMergeTree::alterThread()
unreplicated_data->setColumnsList(columns);
columns_version = stat.version;
LOG_INFO(log, "Applied changes to table.");
/// Нужно получить список кусков под блокировкой таблицы, чтобы избежать race condition с мерджем.
parts = data.getDataParts();
}
else
{
@ -1222,7 +1242,10 @@ void StorageReplicatedMergeTree::alterThread()
int changed_parts = 0;
auto parts = data.getDataParts();
if (!changed)
parts = data.getDataParts();
auto table_lock = lockStructure(false);
for (const MergeTreeData::DataPartPtr & part : parts)
{
@ -1379,7 +1402,7 @@ void StorageReplicatedMergeTree::partCheckThread()
ActiveDataPartSet::parsePartName(part_name, part_info);
/** Будем проверять только куски, не полученные в результате слияния.
* Для кусков, полученных в результате слияния такая проверка была бы некорректной,
* Для кусков, полученных в результате слияния, такая проверка была бы некорректной,
* потому что слитого куска может еще ни у кого не быть.
*/
if (part_info.left == part_info.right)
@ -1461,6 +1484,8 @@ void StorageReplicatedMergeTree::partCheckThread()
/// У нас есть этот кусок, и он активен.
else if (part->name == part_name)
{
auto table_lock = lockStructure(false);
/// Если кусок есть в ZooKeeper, сверим его данные с его чексуммами, а их с ZooKeeper.
if (zookeeper->exists(replica_path + "/parts/" + part_name))
{
@ -1781,11 +1806,60 @@ BlockInputStreams StorageReplicatedMergeTree::read(
size_t max_block_size,
unsigned threads)
{
BlockInputStreams res = reader.read(column_names, query, settings, processed_stage, max_block_size, threads);
Names virt_column_names;
Names real_column_names;
for (const auto & it : column_names)
if (it == "_replicated")
virt_column_names.push_back(it);
else
real_column_names.push_back(it);
if (unreplicated_reader)
Block virtual_columns_block;
ColumnUInt8 * column = new ColumnUInt8(2);
ColumnPtr column_ptr = column;
column->getData()[0] = 0;
column->getData()[1] = 1;
virtual_columns_block.insert(ColumnWithNameAndType(column_ptr, new DataTypeUInt8, "_replicated"));
BlockInputStreamPtr virtual_columns;
/// Если запрошен хотя бы один виртуальный столбец, пробуем индексировать
if (!virt_column_names.empty())
virtual_columns = VirtualColumnUtils::getVirtualColumnsBlocks(query->clone(), virtual_columns_block, context);
else /// Иначе, считаем допустимыми все возможные значения
virtual_columns = new OneBlockInputStream(virtual_columns_block);
std::multiset<UInt8> values = VirtualColumnUtils::extractSingleValueFromBlocks<UInt8>(virtual_columns, "_replicated");
BlockInputStreams res;
if (values.count(1))
{
BlockInputStreams res2 = unreplicated_reader->read(column_names, query, settings, processed_stage, max_block_size, threads);
res = reader.read(real_column_names, query, settings, processed_stage, max_block_size, threads);
for (auto & virtual_column : virt_column_names)
{
if (virtual_column == "_replicated")
{
for (auto & stream : res)
stream = new AddingConstColumnBlockInputStream<UInt8>(stream, new DataTypeUInt8, 1, "_replicated");
}
}
}
if (unreplicated_reader && values.count(0))
{
BlockInputStreams res2 = unreplicated_reader->read(real_column_names, query, settings, processed_stage, max_block_size, threads);
for (auto & virtual_column : virt_column_names)
{
if (virtual_column == "_replicated")
{
for (auto & stream : res2)
stream = new AddingConstColumnBlockInputStream<UInt8>(stream, new DataTypeUInt8, 0, "_replicated");
}
}
res.insert(res.begin(), res2.begin(), res2.end());
}
@ -1935,6 +2009,21 @@ void StorageReplicatedMergeTree::drop()
data.dropAllData();
}
void StorageReplicatedMergeTree::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
{
std::string new_full_path = new_path_to_db + escapeForFileName(new_table_name) + '/';
data.setPath(new_full_path, true);
if (unreplicated_data)
unreplicated_data->setPath(new_full_path + "unreplicated/", false);
database_name = new_database_name;
table_name = new_table_name;
full_path = new_full_path;
/// TODO: Можно обновить названия логгеров.
}
void StorageReplicatedMergeTree::LogEntry::writeText(WriteBuffer & out) const
{
writeString("format version: 1\n", out);

View File

@ -353,13 +353,13 @@ void StorageTinyLog::addFile(const String & column_name, const IDataType & type,
}
void StorageTinyLog::rename(const String & new_path_to_db, const String & new_name)
void StorageTinyLog::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
{
/// Переименовываем директорию с данными.
Poco::File(path + escapeForFileName(name)).renameTo(new_path_to_db + escapeForFileName(new_name));
Poco::File(path + escapeForFileName(name)).renameTo(new_path_to_db + escapeForFileName(new_table_name));
path = new_path_to_db;
name = new_name;
name = new_table_name;
for (Files_t::iterator it = files.begin(); it != files.end(); ++it)
it->second.data_file = Poco::File(path + escapeForFileName(name) + '/' + Poco::Path(it->second.data_file.path()).getFileName());

View File

@ -366,6 +366,9 @@ int32_t ZooKeeper::trySet(const std::string & path, const std::string & data,
int32_t ZooKeeper::multiImpl(const Ops & ops_, OpResultsPtr * out_results_)
{
if (ops_.empty())
return ZOK;
size_t count = ops_.size();
OpResultsPtr out_results(new OpResults(count));