Unification of serde of data types: development [#CLICKHOUSE-2838].

This commit is contained in:
Alexey Milovidov 2017-12-03 01:12:27 +03:00
parent de775e9a3e
commit 5510bea3f8
7 changed files with 62 additions and 52 deletions

View File

@ -55,11 +55,6 @@
#define DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS 5000 /// Maximum waiting time in the request queue.
#define DBMS_DEFAULT_BACKGROUND_POOL_SIZE 16
/// Name suffix for the column containing the array offsets.
#define ARRAY_SIZES_COLUMN_NAME_SUFFIX ".size"
/// And NULL map.
#define NULL_MAP_COLUMN_NAME_SUFFIX ".null"
#define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032
#define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058
#define DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO 54060

View File

@ -67,9 +67,9 @@ String IDataType::getFileNameForStream(const String & column_name, const IDataTy
for (const IDataType::Substream & elem : path)
{
if (elem.type == IDataType::Substream::NullMap)
stream_name += NULL_MAP_COLUMN_NAME_SUFFIX;
stream_name += ".null";
else if (elem.type == IDataType::Substream::ArraySizes)
stream_name = DataTypeNested::extractNestedTableName(stream_name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(array_level);
stream_name = DataTypeNested::extractNestedTableName(stream_name) + ".size" + toString(array_level);
else if (elem.type == IDataType::Substream::ArrayElements)
++array_level;
}

View File

@ -1272,7 +1272,7 @@ void MergeTreeData::AlterDataPartTransaction::commit()
file.remove();
}
mutable_part.size_in_bytes = MergeTreeData::DataPart::calcTotalSize(path);
mutable_part.size_in_bytes = MergeTreeData::DataPart::calculateTotalSize(path);
/// TODO: we can skip resetting caches when the column is added.
data_part->storage.context.dropCaches();

View File

@ -493,7 +493,7 @@ MergeTreeDataPart::~MergeTreeDataPart()
}
}
size_t MergeTreeDataPart::calcTotalSize(const String & from)
size_t MergeTreeDataPart::calculateTotalSize(const String & from)
{
Poco::File cur(from);
if (cur.isFile())
@ -502,7 +502,7 @@ size_t MergeTreeDataPart::calcTotalSize(const String & from)
cur.list(files);
size_t res = 0;
for (const auto & file : files)
res += calcTotalSize(from + file);
res += calculateTotalSize(from + file);
return res;
}
@ -590,7 +590,7 @@ void MergeTreeDataPart::renameAddPrefix(bool to_detached, const String & prefix)
if (to_detached)
{
/** If you need to unhook a part, and directory into which we want to rename it already exists,
/** If you need to detach a part, and directory into which we want to rename it already exists,
* we will rename to the directory with the name to which the suffix is added in the form of "_tryN".
* This is done only in the case of `to_detached`, because it is assumed that in this case the exact name does not matter.
* No more than 10 attempts are made so that there are not too many junk directories left.
@ -660,7 +660,7 @@ void MergeTreeDataPart::loadIndex()
throw Exception("Index file " + index_path + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE);
}
size_in_bytes = calcTotalSize(getFullPath());
size_in_bytes = calculateTotalSize(getFullPath());
}
void MergeTreeDataPart::loadPartitionAndMinMaxIndex()
@ -756,9 +756,16 @@ void MergeTreeDataPart::loadRowsCount()
void MergeTreeDataPart::accumulateColumnSizes(ColumnToSize & column_to_size) const
{
std::shared_lock<std::shared_mutex> part_lock(columns_lock);
for (const NameAndTypePair & column : *storage.columns)
if (Poco::File(getFullPath() + escapeForFileName(column.name) + ".bin").exists())
column_to_size[column.name] += Poco::File(getFullPath() + escapeForFileName(column.name) + ".bin").getSize();
for (const NameAndTypePair & name_type : *storage.columns)
{
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
Poco::File bin_file(getFullPath() + IDataType::getFileNameForStream(name_type.name, substream_path) + ".bin");
if (bin_file.exists())
column_to_size[name_type.name] += bin_file.getSize();
}, {});
}
}
void MergeTreeDataPart::loadColumns(bool require)
@ -801,12 +808,20 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
if (require_part_metadata)
{
for (const NameAndTypePair & it : columns)
for (const NameAndTypePair & name_type : columns)
{
String name = escapeForFileName(it.name);
if (!checksums.files.count(name + ".mrk") ||
!checksums.files.count(name + ".bin"))
throw Exception("No .mrk or .bin file checksum for column " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
String file_name = IDataType::getFileNameForStream(name_type.name, substream_path);
String mrk_file_name = file_name + ".mrk";
String bin_file_name = file_name + ".bin";
if (!checksums.files.count(mrk_file_name))
throw Exception("No " + mrk_file_name + " file checksum for column " + name + " in part " + path,
ErrorCodes::NO_FILE_IN_DATA_PART);
if (!checksums.files.count(bin_file_name))
throw Exception("No " + bin_file_name + " file checksum for column " + name + " in part " + path,
ErrorCodes::NO_FILE_IN_DATA_PART);
}, {});
}
}
@ -853,45 +868,44 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
}
/// Check that all marks are nonempty and have the same size.
auto check_marks = [&path](const NamesAndTypesList & columns, const std::string & extension)
std::optional<size_t> marks_size;
for (const NameAndTypePair & name_type : columns)
{
ssize_t marks_size = -1;
for (const NameAndTypePair & it : columns)
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
Poco::File marks_file(path + escapeForFileName(it.name) + extension);
Poco::File file(IDataType::getFileNameForStream(name_type.name, substream_path) + ".mrk");
/// When you add a new column to the table, the .mrk files are not created. We will not delete anything.
if (!marks_file.exists())
continue;
if (marks_size == -1)
/// Missing file is Ok for case when new column was added.
if (file.exists())
{
marks_size = marks_file.getSize();
size_t file_size = file.getSize();
if (0 == marks_size)
throw Exception("Part " + path + " is broken: " + marks_file.path() + " is empty.",
if (!file_size)
throw Exception("Part " + path + " is broken: " + file.path() + " is empty.",
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
}
else
{
if (static_cast<ssize_t>(marks_file.getSize()) != marks_size)
if (!marks_size)
marks_size = file_size;
else if (file_size != *marks_size)
throw Exception("Part " + path + " is broken: marks have different sizes.",
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
}
}
};
check_marks(columns, ".mrk");
check_marks(columns, ".null.mrk");
}, {});
}
}
}
bool MergeTreeDataPart::hasColumnFiles(const String & column) const
{
/// NOTE: For multi-streams columns we check that just first file exist.
/// That's Ok under assumption that files exist either for all or for no streams.
String prefix = getFullPath();
String escaped_column = escapeForFileName(column);
return Poco::File(prefix + escaped_column + ".bin").exists() &&
Poco::File(prefix + escaped_column + ".mrk").exists();
return Poco::File(prefix + escaped_column + ".bin").exists()
&& Poco::File(prefix + escaped_column + ".mrk").exists();
}
@ -926,7 +940,7 @@ String MergeTreeDataPart::stateToString(MergeTreeDataPart::State state)
case State::Deleting:
return "Deleting";
default:
throw Exception("Unknown part state " + std::to_string(static_cast<int>(state)), ErrorCodes::LOGICAL_ERROR);
throw Exception("Unknown part state " + toString(static_cast<int>(state)), ErrorCodes::LOGICAL_ERROR);
}
}

View File

@ -270,7 +270,7 @@ struct MergeTreeDataPart
~MergeTreeDataPart();
/// Calculate the total size of the entire directory with all the files
static size_t calcTotalSize(const String & from);
static size_t calculateTotalSize(const String & from);
void remove() const;

View File

@ -351,7 +351,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
new_part->columns = *total_column_list;
new_part->index.swap(index_columns);
new_part->checksums = checksums;
new_part->size_in_bytes = MergeTreeData::DataPart::calcTotalSize(new_part->getFullPath());
new_part->size_in_bytes = MergeTreeData::DataPart::calculateTotalSize(new_part->getFullPath());
}
void MergedBlockOutputStream::init()

View File

@ -526,13 +526,14 @@ const StorageLog::Marks & StorageLog::getMarksWithRealRowCount() const
String filename;
/** We take marks from first column.
* If this is an array, then we take the marks corresponding to the sizes, and not to the internals of the arrays.
* If this is a data type with multiple stream, get the first stream, that we assume have real row count.
* (Example: for Array data type, first stream is array sizes; and number of array sizes is the number of arrays).
*/
if (typeid_cast<const DataTypeArray *>(&column_type))
filename = DataTypeNested::extractNestedTableName(column_name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX "0";
else
filename = column_name;
column_type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
if (filename.empty())
filename = IDataType::getFileNameForStream(column_name, substream_path);
}, {});
Files_t::const_iterator it = files.find(filename);
if (files.end() == it)