clickhouse: fixed a leak of file descriptors and memory. [#METR-9664]

This commit is contained in:
Michael Kolupaev 2014-01-15 16:12:48 +00:00
parent 532945a535
commit fd05f7ea53
3 changed files with 28 additions and 27 deletions

View File

@ -221,8 +221,9 @@ private:
String escaped_size_name = escapeForFileName(DataTypeNested::extractNestedTableName(name)) String escaped_size_name = escapeForFileName(DataTypeNested::extractNestedTableName(name))
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams.insert(std::make_pair(size_name, new Stream( if (!streams.count(size_name))
path + escaped_size_name, uncompressed_cache))); streams.insert(std::make_pair(size_name, new Stream(
path + escaped_size_name, uncompressed_cache)));
addStream(name, *type_arr->getNestedType(), level + 1); addStream(name, *type_arr->getNestedType(), level + 1);
} }
@ -231,16 +232,14 @@ private:
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
String escaped_size_name = escaped_column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); String escaped_size_name = escaped_column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams.insert(std::make_pair(size_name, new Stream( streams[size_name] = new Stream(path + escaped_size_name, uncompressed_cache);
path + escaped_size_name, uncompressed_cache)));
const NamesAndTypesList & columns = *type_nested->getNestedTypesList(); const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it) for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addStream(DataTypeNested::concatenateNestedName(name, it->first), *it->second, level + 1); addStream(DataTypeNested::concatenateNestedName(name, it->first), *it->second, level + 1);
} }
else else
streams.insert(std::make_pair(name, new Stream( streams[name] = new Stream(path + escaped_column_name, uncompressed_cache);
path + escaped_column_name, uncompressed_cache)));
} }
void readData(const String & name, const IDataType & type, IColumn & column, size_t from_mark, size_t max_rows_to_read, void readData(const String & name, const IDataType & type, IColumn & column, size_t from_mark, size_t max_rows_to_read,

View File

@ -109,33 +109,34 @@ void LogBlockInputStream::addStream(const String & name, const IDataType & type,
if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&type)) if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&type))
{ {
String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams.insert(std::make_pair(size_name, new Stream( if (!streams.count(size_name))
storage.files[size_name].data_file.path(), streams.insert(std::make_pair(size_name, new Stream(
mark_number storage.files[size_name].data_file.path(),
? storage.files[size_name].marks[mark_number].offset mark_number
: 0))); ? storage.files[size_name].marks[mark_number].offset
: 0)));
addStream(name, *type_arr->getNestedType(), level + 1); addStream(name, *type_arr->getNestedType(), level + 1);
} }
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type)) else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{ {
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams.insert(std::make_pair(size_name, new Stream( streams[size_name] = new Stream(
storage.files[size_name].data_file.path(), storage.files[size_name].data_file.path(),
mark_number mark_number
? storage.files[size_name].marks[mark_number].offset ? storage.files[size_name].marks[mark_number].offset
: 0))); : 0);
const NamesAndTypesList & columns = *type_nested->getNestedTypesList(); const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it) for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addStream(DataTypeNested::concatenateNestedName(name, it->first), *it->second, level + 1); addStream(DataTypeNested::concatenateNestedName(name, it->first), *it->second, level + 1);
} }
else else
streams.insert(std::make_pair(name, new Stream( streams[name] = new Stream(
storage.files[name].data_file.path(), storage.files[name].data_file.path(),
mark_number mark_number
? storage.files[name].marks[mark_number].offset ? storage.files[name].marks[mark_number].offset
: 0))); : 0);
} }
@ -234,24 +235,23 @@ void LogBlockOutputStream::addStream(const String & name, const IDataType & type
if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&type)) if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&type))
{ {
String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams.insert(std::make_pair(size_name, new Stream( if (!streams.count(size_name))
storage.files[size_name].data_file.path()))); streams.insert(std::make_pair(size_name, new Stream(
storage.files[size_name].data_file.path())));
addStream(name, *type_arr->getNestedType(), level + 1); addStream(name, *type_arr->getNestedType(), level + 1);
} }
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type)) else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{ {
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams.insert(std::make_pair(size_name, new Stream( streams[size_name] = new Stream(storage.files[size_name].data_file.path());
storage.files[size_name].data_file.path())));
const NamesAndTypesList & columns = *type_nested->getNestedTypesList(); const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it) for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addStream(DataTypeNested::concatenateNestedName(name, it->first), *it->second, level + 1); addStream(DataTypeNested::concatenateNestedName(name, it->first), *it->second, level + 1);
} }
else else
streams.insert(std::make_pair(name, new Stream( streams[name] = new Stream(storage.files[name].data_file.path());
storage.files[name].data_file.path())));
} }

View File

@ -102,21 +102,22 @@ void TinyLogBlockInputStream::addStream(const String & name, const IDataType & t
if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&type)) if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&type))
{ {
String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams.insert(std::make_pair(size_name, new Stream(storage.files[size_name].data_file.path()))); if (!streams.count(size_name))
streams.insert(std::make_pair(size_name, new Stream(storage.files[size_name].data_file.path())));
addStream(name, *type_arr->getNestedType(), level + 1); addStream(name, *type_arr->getNestedType(), level + 1);
} }
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type)) else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{ {
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams.insert(std::make_pair(size_name, new Stream(storage.files[size_name].data_file.path()))); streams[size_name] = new Stream(storage.files[size_name].data_file.path());
const NamesAndTypesList & columns = *type_nested->getNestedTypesList(); const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it) for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addStream(DataTypeNested::concatenateNestedName(name, it->first), *it->second, level + 1); addStream(DataTypeNested::concatenateNestedName(name, it->first), *it->second, level + 1);
} }
else else
streams.insert(std::make_pair(name, new Stream(storage.files[name].data_file.path()))); streams[name] = new Stream(storage.files[name].data_file.path());
} }
@ -185,21 +186,22 @@ void TinyLogBlockOutputStream::addStream(const String & name, const IDataType &
if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&type)) if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&type))
{ {
String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams.insert(std::make_pair(size_name, new Stream(storage.files[size_name].data_file.path()))); if (streams.count(size_name))
streams.insert(std::make_pair(size_name, new Stream(storage.files[size_name].data_file.path())));
addStream(name, *type_arr->getNestedType(), level + 1); addStream(name, *type_arr->getNestedType(), level + 1);
} }
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type)) else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{ {
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams.insert(std::make_pair(size_name, new Stream(storage.files[size_name].data_file.path()))); streams[size_name] = new Stream(storage.files[size_name].data_file.path());
const NamesAndTypesList & columns = *type_nested->getNestedTypesList(); const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it) for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addStream(DataTypeNested::concatenateNestedName(name, it->first), *it->second, level + 1); addStream(DataTypeNested::concatenateNestedName(name, it->first), *it->second, level + 1);
} }
else else
streams.insert(std::make_pair(name, new Stream(storage.files[name].data_file.path()))); streams[name] = new Stream(storage.files[name].data_file.path());
} }