Unification of serde of data types: development [#CLICKHOUSE-2838].

This commit is contained in:
Alexey Milovidov 2017-12-03 05:15:35 +03:00
parent a9ddaa2b19
commit 1aa99092c3
4 changed files with 39 additions and 77 deletions

View File

@ -23,8 +23,6 @@
#include <IO/Operators.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeNested.h>
#include <DataTypes/DataTypeArray.h>
@ -179,27 +177,6 @@ MergeTreeData::MergeTreeData(
}
void MergeTreeData::checkNoMultidimensionalArrays(const NamesAndTypesList & columns, bool attach) const
{
for (const auto & column : columns)
{
if (const auto * array = dynamic_cast<const DataTypeArray *>(column.type.get()))
{
if (dynamic_cast<const DataTypeArray *>(array->getNestedType().get()))
{
std::string message =
"Column " + column.name + " is a multidimensional array. "
+ "Multidimensional arrays are not supported in MergeTree engines.";
if (!attach)
throw Exception(message, ErrorCodes::BAD_TYPE_OF_FIELD);
else
LOG_ERROR(log, message);
}
}
}
}
void MergeTreeData::initPrimaryKey()
{
if (!primary_expr_ast)
@ -345,7 +322,6 @@ void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) cons
&& !typeid_cast<const DataTypeUInt16 *>(column.type.get())
&& !typeid_cast<const DataTypeUInt32 *>(column.type.get())
&& !typeid_cast<const DataTypeUInt64 *>(column.type.get())
&& !typeid_cast<const DataTypeUUID *>(column.type.get())
&& !typeid_cast<const DataTypeDate *>(column.type.get())
&& !typeid_cast<const DataTypeDateTime *>(column.type.get()))
throw Exception("Version column (" + version_column + ")"
@ -703,22 +679,17 @@ void MergeTreeData::clearOldPartsFromFilesystem()
removePartsFinally(parts_to_remove);
}
void MergeTreeData::setPath(const String & new_full_path, bool move_data)
void MergeTreeData::setPath(const String & new_full_path)
{
if (move_data)
{
if (Poco::File{new_full_path}.exists())
throw Exception{
"Target path already exists: " + new_full_path,
/// @todo existing target can also be a file, not directory
ErrorCodes::DIRECTORY_ALREADY_EXISTS
};
Poco::File(full_path).renameTo(new_full_path);
/// If we don't need to move the data, it means someone else has already moved it.
/// We hope that he has also reset the caches.
context.dropCaches();
}
if (Poco::File{new_full_path}.exists())
throw Exception{
"Target path already exists: " + new_full_path,
/// @todo existing target can also be a file, not directory
ErrorCodes::DIRECTORY_ALREADY_EXISTS};
Poco::File(full_path).renameTo(new_full_path);
context.dropCaches();
full_path = new_full_path;
}
@ -762,7 +733,6 @@ bool isMetadataOnlyConversion(const IDataType * from, const IDataType * to)
{ typeid(DataTypeDateTime), typeid(DataTypeUInt32) },
{ typeid(DataTypeUInt32), typeid(DataTypeDateTime) },
{ typeid(DataTypeDate), typeid(DataTypeUInt16) },
{ typeid(DataTypeUUID), typeid(DataTypeUUID) },
{ typeid(DataTypeUInt16), typeid(DataTypeDate) },
};
@ -808,9 +778,6 @@ void MergeTreeData::checkAlter(const AlterCommands & commands)
auto new_column_defaults = column_defaults;
commands.apply(new_columns, new_materialized_columns, new_alias_columns, new_column_defaults);
checkNoMultidimensionalArrays(new_columns, /* attach = */ false);
checkNoMultidimensionalArrays(new_materialized_columns, /* attach = */ false);
/// Set of columns that shouldn't be altered.
NameSet columns_alter_forbidden;
@ -1160,7 +1127,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
* temporary column name ('converting_column_name') created in 'createConvertExpression' method
* will have old name of shared offsets for arrays.
*/
MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true, compression_settings, true /* skip_offsets */);
MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true /* sync */, compression_settings, false /* skip_offsets */);
in.readPrefix();
out.writePrefix();
@ -1224,19 +1191,19 @@ void MergeTreeData::AlterDataPartTransaction::commit()
/// before, i.e. they do not have older versions.
/// 1) Rename the old files.
for (auto it : rename_map)
for (const auto & from_to : rename_map)
{
String name = it.second.empty() ? it.first : it.second;
String name = from_to.second.empty() ? from_to.first : from_to.second;
Poco::File file{path + name};
if (file.exists())
file.renameTo(path + name + ".tmp2");
}
/// 2) Move new files in the place of old and update the metadata in memory.
for (auto it : rename_map)
for (const auto & from_to : rename_map)
{
if (!it.second.empty())
Poco::File{path + it.first}.renameTo(path + it.second);
if (!from_to.second.empty())
Poco::File{path + from_to.first}.renameTo(path + from_to.second);
}
auto & mutable_part = const_cast<DataPart &>(*data_part);
@ -1244,9 +1211,9 @@ void MergeTreeData::AlterDataPartTransaction::commit()
mutable_part.columns = new_columns;
/// 3) Delete the old files.
for (auto it : rename_map)
for (const auto & from_to : rename_map)
{
String name = it.second.empty() ? it.first : it.second;
String name = from_to.second.empty() ? from_to.first : from_to.second;
Poco::File file{path + name + ".tmp2"};
if (file.exists())
file.remove();
@ -1931,7 +1898,6 @@ size_t MergeTreeData::getPartitionSize(const std::string & partition_id) const
size_t size = 0;
Poco::DirectoryIterator end;
Poco::DirectoryIterator end2;
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
{
@ -1942,7 +1908,7 @@ size_t MergeTreeData::getPartitionSize(const std::string & partition_id) const
continue;
const auto part_path = it.path().absolute().toString();
for (Poco::DirectoryIterator it2(part_path); it2 != end2; ++it2)
for (Poco::DirectoryIterator it2(part_path); it2 != end; ++it2)
{
const auto part_file_path = it2.path().absolute().toString();
size += Poco::File(part_file_path).getSize();

View File

@ -273,27 +273,27 @@ public:
/// Otherwise, partition_expr_ast is used for partitioning.
/// require_part_metadata - should checksums.txt and columns.txt exist in the part directory.
/// attach - whether the existing table is attached or the new table is created.
MergeTreeData( const String & database_, const String & table_,
const String & full_path_, NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
Context & context_,
const ASTPtr & primary_expr_ast_,
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported.
const MergingParams & merging_params_,
const MergeTreeSettings & settings_,
const String & log_name_,
bool require_part_metadata_,
bool attach,
BrokenPartCallback broken_part_callback_ = [](const String &){});
MergeTreeData(const String & database_, const String & table_,
const String & full_path_, NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
Context & context_,
const ASTPtr & primary_expr_ast_,
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported.
const MergingParams & merging_params_,
const MergeTreeSettings & settings_,
const String & log_name_,
bool require_part_metadata_,
bool attach,
BrokenPartCallback broken_part_callback_ = [](const String &){});
/// Load the set of data parts from disk. Call once - immediately after the object is created.
void loadDataParts(bool skip_sanity_checks);
bool supportsSampling() const { return !!sampling_expression; }
bool supportsSampling() const { return sampling_expression != nullptr; }
bool supportsPrewhere() const { return true; }
bool supportsFinal() const
@ -410,7 +410,7 @@ public:
/// Moves the entire data directory.
/// Flushes the uncompressed blocks cache and the marks cache.
/// Must be called with locked lockStructureForAlter().
void setPath(const String & full_path, bool move_data);
void setPath(const String & full_path);
/// Check if the ALTER can be performed:
/// - all needed columns are present.
@ -492,6 +492,7 @@ public:
return total_size;
}
/// Calculates column sizes in compressed form for the current state of data_parts.
void recalculateColumnSizes()
{
std::lock_guard<std::mutex> lock{data_parts_mutex};
@ -640,11 +641,6 @@ private:
/// The same for clearOldTemporaryDirectories.
std::mutex clear_old_temporary_directories_mutex;
/// Check that columns list doesn't contain multidimensional arrays.
/// If attach is true (attaching an existing table), writes an error message to log.
/// Otherwise (new table or alter) throws an exception.
void checkNoMultidimensionalArrays(const NamesAndTypesList & columns, bool attach) const;
void initPrimaryKey();
void initPartitionKey();

View File

@ -134,7 +134,7 @@ void StorageMergeTree::rename(const String & new_path_to_db, const String & /*ne
{
std::string new_full_path = new_path_to_db + escapeForFileName(new_table_name) + '/';
data.setPath(new_full_path, true);
data.setPath(new_full_path);
path = new_path_to_db;
table_name = new_table_name;

View File

@ -2855,7 +2855,7 @@ void StorageReplicatedMergeTree::rename(const String & new_path_to_db, const Str
{
std::string new_full_path = new_path_to_db + escapeForFileName(new_table_name) + '/';
data.setPath(new_full_path, true);
data.setPath(new_full_path);
database_name = new_database_name;
table_name = new_table_name;