This commit is contained in:
Sergey Magidovich 2015-04-03 13:51:12 +03:00
commit 7647a17bb9
27 changed files with 550 additions and 266 deletions

View File

@ -30,11 +30,12 @@ How to prepare data
Prepare dumps with script create_dump.sh for tables hits_10m, hits_100m, hits_1000m. It takes about 5 hours (1m41.882s, 25m11.103s, 276m36.388s).
Start vsql command line client.
/opt/vertica/bin/vsql -U dbadmin
Create tables with queries from hits_define_schema.sql.
Time to insert data:
hits_10m: 91 sec.
hits_100m: 774 sec.
hits_1000m:
hits_1000m: 13769 sec.
You need to validate number of rows with SELECT count(*).

View File

@ -2,16 +2,16 @@ SELECT count(*) FROM {table};
SELECT count(*) FROM {table} WHERE AdvEngineID != 0;
SELECT sum(AdvEngineID), count(*), avg(ResolutionWidth) FROM {table};
SELECT sum_float(UserID) FROM {table};
SELECT count(DISTINCT UserID) FROM {table};
SELECT count(DISTINCT SearchPhrase) FROM {table};
SELECT COUNT(DISTINCT UserID) FROM {table};
SELECT COUNT(DISTINCT SearchPhrase) FROM {table};
SELECT min(EventDate), max(EventDate) FROM {table};
SELECT AdvEngineID, count(*) FROM {table} WHERE AdvEngineID != 0 GROUP BY AdvEngineID ORDER BY count(*) DESC;
SELECT RegionID, count(DISTINCT UserID) AS u FROM {table} GROUP BY RegionID ORDER BY u DESC LIMIT 10;
SELECT RegionID, sum(AdvEngineID), count(*) AS c, avg(ResolutionWidth), count(DISTINCT UserID) FROM {table} GROUP BY RegionID ORDER BY count(*) DESC LIMIT 10;
SELECT MobilePhoneModel, count(DISTINCT UserID) AS u FROM {table} WHERE MobilePhoneModel != '' GROUP BY MobilePhoneModel ORDER BY u DESC LIMIT 10;
SELECT MobilePhone, MobilePhoneModel, count(DISTINCT UserID) AS u FROM {table} WHERE MobilePhoneModel != '' GROUP BY MobilePhone, MobilePhoneModel ORDER BY u DESC LIMIT 10;
SELECT RegionID, COUNT(DISTINCT UserID) AS u FROM {table} GROUP BY RegionID ORDER BY u DESC LIMIT 10;
SELECT RegionID, sum(AdvEngineID), count(*) AS c, avg(ResolutionWidth), COUNT(DISTINCT UserID) FROM {table} GROUP BY RegionID ORDER BY count(*) DESC LIMIT 10;
SELECT MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM {table} WHERE MobilePhoneModel != '' GROUP BY MobilePhoneModel ORDER BY u DESC LIMIT 10;
SELECT MobilePhone, MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM {table} WHERE MobilePhoneModel != '' GROUP BY MobilePhone, MobilePhoneModel ORDER BY u DESC LIMIT 10;
SELECT SearchPhrase, count(*) FROM {table} WHERE SearchPhrase != '' GROUP BY SearchPhrase ORDER BY count(*) DESC LIMIT 10;
SELECT SearchPhrase, count(DISTINCT UserID) AS u FROM {table} WHERE SearchPhrase != '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10;
SELECT SearchPhrase, COUNT(DISTINCT UserID) AS u FROM {table} WHERE SearchPhrase != '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10;
SELECT SearchEngineID, SearchPhrase, count(*) FROM {table} WHERE SearchPhrase != '' GROUP BY SearchEngineID, SearchPhrase ORDER BY count(*) DESC LIMIT 10;
SELECT UserID, count(*) FROM {table} GROUP BY UserID ORDER BY count(*) DESC LIMIT 10;
SELECT UserID, SearchPhrase, count(*) FROM {table} GROUP BY UserID, SearchPhrase ORDER BY count(*) DESC LIMIT 10;
@ -20,13 +20,13 @@ SELECT UserID, Minute(EventTime) AS m, SearchPhrase, count(*) FROM {table} GROUP
SELECT UserID FROM {table} WHERE UserID = 12345678901234567890;
SELECT count(*) FROM {table} WHERE URL LIKE '%metrika%';
SELECT SearchPhrase, MAX(URL), count(*) FROM {table} WHERE URL LIKE '%metrika%' AND SearchPhrase != '' GROUP BY SearchPhrase ORDER BY count(*) DESC LIMIT 10;
SELECT SearchPhrase, MAX(URL), MAX(Title), count(*) AS c, count(DISTINCT UserID) FROM {table} WHERE Title LIKE '%Яндекс%' AND URL NOT LIKE '%.yandex.%' AND SearchPhrase != '' GROUP BY SearchPhrase ORDER BY count(*) DESC LIMIT 10;
SELECT SearchPhrase, MAX(URL), MAX(Title), count(*) AS c, COUNT(DISTINCT UserID) FROM {table} WHERE Title LIKE '%Яндекс%' AND URL NOT LIKE '%.yandex.%' AND SearchPhrase != '' GROUP BY SearchPhrase ORDER BY count(*) DESC LIMIT 10;
SELECT * FROM {table} WHERE URL LIKE '%metrika%' ORDER BY EventTime LIMIT 10;
SELECT SearchPhrase FROM {table} WHERE SearchPhrase != '' ORDER BY EventTime LIMIT 10;
SELECT SearchPhrase FROM {table} WHERE SearchPhrase != '' ORDER BY SearchPhrase LIMIT 10;
SELECT SearchPhrase FROM {table} WHERE SearchPhrase != '' ORDER BY EventTime, SearchPhrase LIMIT 10;
SELECT CounterID, avg(length(URL)) AS l, count(*) FROM {table} WHERE URL != '' GROUP BY CounterID HAVING count(*) > 100000 ORDER BY l DESC LIMIT 25;
SELECT SUBSTRING(SUBSTRING(Referer, POSITION('//' IN Referer) + 2), 1, GREATEST(0, POSITION('/' IN SUBSTRING(Referer, POSITION('//' IN Referer) + 2)) - 1)) AS key, avg(length(Referer)) AS l, count(*) AS c, MAX(Referer) FROM {table} WHERE Referer != '' GROUP BY key HAVING count(*) > 100000 ORDER BY l DESC LIMIT 25;
SELECT CounterID, avg(OCTET_LENGTH(URL)) AS l, count(*) FROM {table} WHERE URL != '' GROUP BY CounterID HAVING count(*) > 100000 ORDER BY l DESC LIMIT 25;
SELECT SUBSTRB(SUBSTRB(Referer, POSITIONB(Referer, '//') + 2), 1, GREATEST(0, POSITIONB(SUBSTRB(Referer, POSITIONB(Referer, '//') + 2), '/') - 1)) AS key, avg(OCTET_LENGTH(Referer)) AS l, count(*) AS c, MAX(Referer) FROM {table} WHERE Referer != '' GROUP BY key HAVING count(*) > 100000 ORDER BY l DESC LIMIT 25;
SELECT sum(ResolutionWidth), sum(ResolutionWidth + 1), sum(ResolutionWidth + 2), sum(ResolutionWidth + 3), sum(ResolutionWidth + 4), sum(ResolutionWidth + 5), sum(ResolutionWidth + 6), sum(ResolutionWidth + 7), sum(ResolutionWidth + 8), sum(ResolutionWidth + 9), sum(ResolutionWidth + 10), sum(ResolutionWidth + 11), sum(ResolutionWidth + 12), sum(ResolutionWidth + 13), sum(ResolutionWidth + 14), sum(ResolutionWidth + 15), sum(ResolutionWidth + 16), sum(ResolutionWidth + 17), sum(ResolutionWidth + 18), sum(ResolutionWidth + 19), sum(ResolutionWidth + 20), sum(ResolutionWidth + 21), sum(ResolutionWidth + 22), sum(ResolutionWidth + 23), sum(ResolutionWidth + 24), sum(ResolutionWidth + 25), sum(ResolutionWidth + 26), sum(ResolutionWidth + 27), sum(ResolutionWidth + 28), sum(ResolutionWidth + 29), sum(ResolutionWidth + 30), sum(ResolutionWidth + 31), sum(ResolutionWidth + 32), sum(ResolutionWidth + 33), sum(ResolutionWidth + 34), sum(ResolutionWidth + 35), sum(ResolutionWidth + 36), sum(ResolutionWidth + 37), sum(ResolutionWidth + 38), sum(ResolutionWidth + 39), sum(ResolutionWidth + 40), sum(ResolutionWidth + 41), sum(ResolutionWidth + 42), sum(ResolutionWidth + 43), sum(ResolutionWidth + 44), sum(ResolutionWidth + 45), sum(ResolutionWidth + 46), sum(ResolutionWidth + 47), sum(ResolutionWidth + 48), sum(ResolutionWidth + 49), sum(ResolutionWidth + 50), sum(ResolutionWidth + 51), sum(ResolutionWidth + 52), sum(ResolutionWidth + 53), sum(ResolutionWidth + 54), sum(ResolutionWidth + 55), sum(ResolutionWidth + 56), sum(ResolutionWidth + 57), sum(ResolutionWidth + 58), sum(ResolutionWidth + 59), sum(ResolutionWidth + 60), sum(ResolutionWidth + 61), sum(ResolutionWidth + 62), sum(ResolutionWidth + 63), sum(ResolutionWidth + 64), sum(ResolutionWidth + 65), sum(ResolutionWidth + 66), sum(ResolutionWidth + 67), sum(ResolutionWidth + 68), sum(ResolutionWidth + 69), sum(ResolutionWidth + 70), sum(ResolutionWidth + 71), sum(ResolutionWidth + 72), sum(ResolutionWidth + 73), sum(ResolutionWidth + 74), sum(ResolutionWidth + 75), sum(ResolutionWidth + 76), sum(ResolutionWidth + 77), sum(ResolutionWidth + 78), sum(ResolutionWidth + 79), sum(ResolutionWidth + 80), sum(ResolutionWidth + 81), sum(ResolutionWidth + 82), sum(ResolutionWidth + 83), sum(ResolutionWidth + 84), sum(ResolutionWidth + 85), sum(ResolutionWidth + 86), sum(ResolutionWidth + 87), sum(ResolutionWidth + 88), sum(ResolutionWidth + 89) FROM {table};
SELECT SearchEngineID, ClientIP, count(*) AS c, sum(Refresh), avg(ResolutionWidth) FROM {table} WHERE SearchPhrase != '' GROUP BY SearchEngineID, ClientIP ORDER BY count(*) DESC LIMIT 10;
SELECT WatchID, ClientIP, count(*) AS c, sum(Refresh), avg(ResolutionWidth) FROM {table} WHERE SearchPhrase != '' GROUP BY WatchID, ClientIP ORDER BY count(*) DESC LIMIT 10;
@ -34,10 +34,10 @@ SELECT WatchID, ClientIP, count(*) AS c, sum(Refresh), avg(ResolutionWidth) FROM
SELECT URL, count(*) FROM {table} GROUP BY URL ORDER BY count(*) DESC LIMIT 10;
SELECT 1, URL, count(*) FROM {table} GROUP BY 1, URL ORDER BY count(*) DESC LIMIT 10;
SELECT ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3, count(*) FROM {table} GROUP BY ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3 ORDER BY count(*) DESC LIMIT 10;
SELECT URL, count(*) AS PageViews FROM {table} WHERE CounterID = 34 AND EventDate >= DATE('2013-07-01') AND EventDate <= DATE('2013-07-31') AND NOT DontCountHits AND NOT Refresh AND URL != '' GROUP BY URL ORDER BY PageViews DESC LIMIT 10;
SELECT URL, count(*) AS PageViews FROM {table} WHERE CounterID = 34 AND EventDate >= DATE('2013-07-01') AND EventDate <= DATE('2013-07-31') AND NOT DontCountHits AND NOT Refresh AND URL != '' GROUP BY URL ORDER BY PageViews DESC LIMIT 10;
SELECT Title, count(*) AS PageViews FROM {table} WHERE CounterID = 34 AND EventDate >= DATE('2013-07-01') AND EventDate <= DATE('2013-07-31') AND NOT DontCountHits AND NOT Refresh AND Title != '' GROUP BY Title ORDER BY PageViews DESC LIMIT 10;
SELECT URL, count(*) AS PageViews FROM {table} WHERE CounterID = 34 AND EventDate >= DATE('2013-07-01') AND EventDate <= DATE('2013-07-31') AND NOT Refresh AND IsLink AND NOT IsDownload GROUP BY URL ORDER BY PageViews DESC LIMIT 1000;
SELECT TraficSourceID, SearchEngineID, AdvEngineID, CASE WHEN SearchEngineID = 0 AND AdvEngineID = 0 THEN Referer ELSE '' END AS Src, URL AS Dst, count(*) AS PageViews FROM {table} WHERE CounterID = 34 AND EventDate >= DATE('2013-07-01') AND EventDate <= DATE('2013-07-31') AND NOT Refresh GROUP BY TraficSourceID, SearchEngineID, AdvEngineID, Src, Dst ORDER BY PageViews DESC LIMIT 1000;
SELECT URLHash, EventDate, count(*) AS PageViews FROM {table} WHERE CounterID = 34 AND EventDate >= DATE('2013-07-01') AND EventDate <= DATE('2013-07-31') AND NOT Refresh AND TraficSourceID IN (-1, 6) AND RefererHash = 6202628419148573758 GROUP BY URLHash, EventDate ORDER BY PageViews DESC LIMIT 100000;
SELECT WindowClientWidth, WindowClientHeight, count(*) AS PageViews FROM {table} WHERE CounterID = 34 AND EventDate >= DATE('2013-07-01') AND EventDate <= DATE('2013-07-31') AND NOT Refresh AND NOT DontCountHits AND URLHash = 6202628419148573758 GROUP BY WindowClientWidth, WindowClientHeight ORDER BY PageViews DESC LIMIT 10000;
SELECT TraficSourceID, SearchEngineID, AdvEngineID, CASE WHEN SearchEngineID = 0 AND AdvEngineID = 0 THEN Referer ELSE '' END AS Src, URL AS Dst, count(*) AS PageViews FROM {table} WHERE CounterID = 34 AND EventDate >= DATE('2013-07-01') AND EventDate <= DATE('2013-07-31') AND NOT Refresh GROUP BY TraficSourceID, SearchEngineID, AdvEngineID, Src, Dst ORDER BY PageViews DESC LIMIT 1000;
SELECT URLHash, EventDate, count(*) AS PageViews FROM {table} WHERE CounterID = 34 AND EventDate >= DATE('2013-07-01') AND EventDate <= DATE('2013-07-31') AND NOT Refresh AND TraficSourceID IN (-1, 6) AND RefererHash = 6202628419148573758 GROUP BY URLHash, EventDate ORDER BY PageViews DESC LIMIT 100000;
SELECT WindowClientWidth, WindowClientHeight, count(*) AS PageViews FROM {table} WHERE CounterID = 34 AND EventDate >= DATE('2013-07-01') AND EventDate <= DATE('2013-07-31') AND NOT Refresh AND NOT DontCountHits AND URLHash = 6202628419148573758 GROUP BY WindowClientWidth, WindowClientHeight ORDER BY PageViews DESC LIMIT 10000;
SELECT TIME_SLICE(EventTime, 1, 'MINUTE') AS Minute, count(*) AS PageViews FROM {table} WHERE CounterID = 34 AND EventDate >= DATE('2013-07-01') AND EventDate <= DATE('2013-07-02') AND NOT Refresh AND NOT DontCountHits GROUP BY Minute ORDER BY Minute;

View File

@ -11,6 +11,7 @@
#include <DB/Columns/ColumnFixedString.h>
#include <DB/Columns/ColumnConst.h>
#include <DB/Functions/IFunction.h>
#include <statdaemons/ext/range.hpp>
namespace DB
@ -498,10 +499,11 @@ public:
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (!arguments[1].column)
throw Exception("Second argument for function " + getName() + " must be constant", ErrorCodes::ILLEGAL_COLUMN);
if (!typeid_cast<const DataTypeString *>(&*arguments[0].type))
throw Exception(getName() + " is only implemented for type String", ErrorCodes::NOT_IMPLEMENTED);
if (!typeid_cast<const DataTypeString *>(arguments[0].type.get()) &&
!typeid_cast<const DataTypeFixedString *>(arguments[0].type.get()))
throw Exception(getName() + " is only implemented for types String and FixedString", ErrorCodes::NOT_IMPLEMENTED);
size_t n = getSize(arguments[1]);
const size_t n = getSize(arguments[1]);
out_return_type = new DataTypeFixedString(n);
}
@ -523,7 +525,7 @@ public:
block.getByPosition(result).column = new ColumnConst<String>(column_const->size(), std::move(resized_string), new DataTypeFixedString(n));
}
else if(const ColumnString * column_string = typeid_cast<const ColumnString *>(&*column))
else if (const ColumnString * column_string = typeid_cast<const ColumnString *>(&*column))
{
ColumnFixedString * column_fixed = new ColumnFixedString(n);
ColumnPtr result_ptr = column_fixed;
@ -542,6 +544,26 @@ public:
}
block.getByPosition(result).column = result_ptr;
}
else if (const auto column_fixed_string = typeid_cast<const ColumnFixedString *>(column.get()))
{
const auto src_n = column_fixed_string->getN();
if (src_n > n)
throw Exception{
"String too long for type FixedString(" + toString(n) + ")",
ErrorCodes::TOO_LARGE_STRING_SIZE
};
const auto column_fixed = new ColumnFixedString{n};
block.getByPosition(result).column = column_fixed;
auto & out_chars = column_fixed->getChars();
const auto & in_chars = column_fixed_string->getChars();
const auto size = column_fixed_string->size();
out_chars.resize_fill(size * n);
for (const auto i : ext::range(0, size))
memcpy(&out_chars[i * n], &in_chars[i * src_n], src_n);
}
else
throw Exception("Unexpected column: " + column->getName(), ErrorCodes::ILLEGAL_COLUMN);
}

View File

@ -349,29 +349,49 @@ inline void readDateText(mysqlxx::Date & date, ReadBuffer & buf)
}
/// в формате YYYY-MM-DD HH:MM:SS, согласно текущему часовому поясу
template <typename T>
inline T parse(const char * data, size_t size);
void readDateTimeTextFallback(time_t & datetime, ReadBuffer & buf);
/** В формате YYYY-MM-DD hh:mm:ss, согласно текущему часовому поясу
* В качестве исключения, также поддерживается парсинг из десятичного числа - unix timestamp.
*/
inline void readDateTimeText(time_t & datetime, ReadBuffer & buf)
{
char s[19];
size_t size = buf.read(s, 19);
if (19 != size)
/** Считываем 10 символов, которые могут быть unix timestamp.
* При этом, поддерживается только unix timestamp из 10 символов - от 9 сентября 2001.
* Потом смотрим на пятый символ. Если это число - парсим unix timestamp.
* Если это не число - парсим YYYY-MM-DD hh:mm:ss.
*/
/// Оптимистичный вариант, когда всё значение точно лежит в буфере.
const char * s = buf.position();
if (s + 19 < buf.buffer().end())
{
s[size] = 0;
throw Exception(std::string("Cannot parse datetime ") + s, ErrorCodes::CANNOT_PARSE_DATETIME);
if (s[4] < '0' || s[4] > '9')
{
UInt16 year = (s[0] - '0') * 1000 + (s[1] - '0') * 100 + (s[2] - '0') * 10 + (s[3] - '0');
UInt8 month = (s[5] - '0') * 10 + (s[6] - '0');
UInt8 day = (s[8] - '0') * 10 + (s[9] - '0');
UInt8 hour = (s[11] - '0') * 10 + (s[12] - '0');
UInt8 minute = (s[14] - '0') * 10 + (s[15] - '0');
UInt8 second = (s[17] - '0') * 10 + (s[18] - '0');
if (unlikely(year == 0))
datetime = 0;
else
datetime = DateLUT::instance().makeDateTime(year, month, day, hour, minute, second);
buf.position() += 19;
}
else
readIntTextUnsafe(datetime, buf);
}
UInt16 year = (s[0] - '0') * 1000 + (s[1] - '0') * 100 + (s[2] - '0') * 10 + (s[3] - '0');
UInt8 month = (s[5] - '0') * 10 + (s[6] - '0');
UInt8 day = (s[8] - '0') * 10 + (s[9] - '0');
UInt8 hour = (s[11] - '0') * 10 + (s[12] - '0');
UInt8 minute = (s[14] - '0') * 10 + (s[15] - '0');
UInt8 second = (s[17] - '0') * 10 + (s[18] - '0');
if (unlikely(year == 0))
datetime = 0;
else
datetime = DateLUT::instance().makeDateTime(year, month, day, hour, minute, second);
readDateTimeTextFallback(datetime, buf);
}
inline void readDateTimeText(mysqlxx::DateTime & datetime, ReadBuffer & buf)

View File

@ -262,9 +262,8 @@ public:
const FormatFactory & getFormatFactory() const { return shared->format_factory; }
const Dictionaries & getDictionaries() const;
const ExternalDictionaries & getExternalDictionaries() const;
void tryCreateDictionaries(bool throw_on_error = false) const;
void tryCreateExternalDictionaries(bool throw_on_error = false) const;
void tryCreateDictionaries() const;
void tryCreateExternalDictionaries() const;
InterserverIOHandler & getInterserverIOHandler() { return shared->interserver_io_handler; }
@ -342,6 +341,10 @@ public:
Compiler & getCompiler();
void shutdown() { shared->shutdown(); }
private:
const Dictionaries & getDictionariesImpl(bool throw_on_error) const;
const ExternalDictionaries & getExternalDictionariesImpl(bool throw_on_error) const;
};

View File

@ -284,3 +284,27 @@ void NO_INLINE Aggregator::executeSpecializedWithoutKey(
}
}
/** Основной код компилируется с помощью gcc 4.9.
* Но SpecializedAggregator компилируется с помощью clang 3.6 в .so-файл.
* Это делается потому что gcc не удаётся заставить инлайнить функции,
* которые были девиртуализированы, в конкретном случае, и производительность получается ниже.
* А также clang проще распространять для выкладки на серверы.
*
* После перехода с gcc 4.8 и gnu++1x на gcc 4.9 и gnu++1y,
* при dlopen стала возникать ошибка: undefined symbol: __cxa_pure_virtual
*
* Скорее всего, это происходит из-за изменившейся версии этого символа:
* gcc создаёт в .so символ
* U __cxa_pure_virtual@@CXXABI_1.3
* а clang создаёт символ
* U __cxa_pure_virtual
*
* Но нам не принципиально, как будет реализована функция __cxa_pure_virtual,
* потому что она не вызывается при нормальной работе программы,
* а если вызывается - то программа и так гарантированно глючит.
*
* Поэтому, мы можем обойти проблему таким образом:
*/
extern "C" void __attribute__((__visibility__("default"), __noreturn__)) __cxa_pure_virtual() { abort(); };

View File

@ -30,26 +30,39 @@ public:
{
std::reverse(remaining_mark_ranges.begin(), remaining_mark_ranges.end());
/// inject columns required for defaults evaluation
const auto injected_columns = injectRequiredColumns(column_names);
/// insert injected columns into ordered columns list to avoid exception about different block structures
ordered_names.insert(std::end(ordered_names), std::begin(injected_columns), std::end(injected_columns));
Names pre_column_names;
if (prewhere_actions)
{
pre_column_names = prewhere_actions->getRequiredColumns();
/// @todo somehow decide which injected columns belong to PREWHERE, optimizing reads
pre_column_names.insert(std::end(pre_column_names),
std::begin(injected_columns), std::end(injected_columns));
if (pre_column_names.empty())
pre_column_names.push_back(column_names[0]);
NameSet pre_name_set(pre_column_names.begin(), pre_column_names.end());
const NameSet pre_name_set(pre_column_names.begin(), pre_column_names.end());
/// Если выражение в PREWHERE - не столбец таблицы, не нужно отдавать наружу столбец с ним
/// (от storage ожидают получить только столбцы таблицы).
remove_prewhere_column = !pre_name_set.count(prewhere_column);
Names post_column_names;
for (const auto & name : column_names)
{
if (!pre_name_set.count(name))
post_column_names.push_back(name);
}
column_names = post_column_names;
}
column_name_set.insert(column_names.begin(), column_names.end());
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
column_name_set = NameSet{column_names.begin(), column_names.end()};
if (check_columns)
{
@ -111,47 +124,53 @@ protected:
/// Будем вызывать progressImpl самостоятельно.
void progress(const Progress & value) override {}
void injectRequiredColumns(NamesAndTypesList & columns) const {
std::set<NameAndTypePair> required_columns;
auto modified = false;
for (auto it = std::begin(columns); it != std::end(columns);)
/** Если некоторых запрошенных столбцов нет в куске,
* то выясняем, какие столбцы может быть необходимо дополнительно прочитать,
* чтобы можно было вычислить DEFAULT выражение для этих столбцов.
* Добавляет их в columns.
*/
NameSet injectRequiredColumns(Names & columns) const
{
NameSet required_columns{std::begin(columns), std::end(columns)};
NameSet injected_columns;
for (size_t i = 0; i < columns.size(); ++i)
{
required_columns.emplace(*it);
const auto & column_name = columns[i];
if (!owned_data_part->hasColumnFiles(it->name))
/// column has files and hence does not require evaluation
if (owned_data_part->hasColumnFiles(column_name))
continue;
const auto default_it = storage.column_defaults.find(column_name);
/// columns has no explicit default expression
if (default_it == std::end(storage.column_defaults))
continue;
/// collect identifiers required for evaluation
IdentifierNameSet identifiers;
default_it->second.expression->collectIdentifierNames(identifiers);
for (const auto & identifier : identifiers)
{
const auto default_it = storage.column_defaults.find(it->name);
if (default_it != std::end(storage.column_defaults))
if (storage.hasColumn(identifier))
{
IdentifierNameSet identifiers;
default_it->second.expression->collectIdentifierNames(identifiers);
for (const auto & identifier : identifiers)
/// ensure each column is added only once
if (required_columns.count(identifier) == 0)
{
if (storage.hasColumn(identifier))
{
NameAndTypePair column{identifier, storage.getDataTypeByName(identifier)};
if (required_columns.count(column) == 0)
{
it = columns.emplace(++it, std::move(column));
modified = true;
}
}
columns.emplace_back(identifier);
required_columns.emplace(identifier);
injected_columns.emplace(identifier);
}
if (modified)
continue;
}
}
++it;
}
if (modified)
columns = NamesAndTypesList{std::begin(required_columns), std::end(required_columns)};
return injected_columns;
}
Block readImpl() override
{
Block res;
@ -161,14 +180,12 @@ protected:
if (!reader)
{
injectRequiredColumns(columns);
injectRequiredColumns(pre_columns);
UncompressedCache * uncompressed_cache = use_uncompressed_cache ? storage.context.getUncompressedCache() : nullptr;
UncompressedCache * uncompressed_cache = use_uncompressed_cache ? storage.context.getUncompressedCache() : NULL;
reader.reset(new MergeTreeReader(path, owned_data_part, columns, uncompressed_cache, storage, all_mark_ranges));
if (prewhere_actions)
pre_reader.reset(new MergeTreeReader(path, owned_data_part, pre_columns, uncompressed_cache, storage,
all_mark_ranges));
pre_reader.reset(new MergeTreeReader(path, owned_data_part, pre_columns, uncompressed_cache, storage, all_mark_ranges));
}
if (prewhere_actions)
@ -191,7 +208,7 @@ protected:
if (range.begin == range.end)
remaining_mark_ranges.pop_back();
}
progressImpl(Progress(res.rows(), res.bytes()));
progressImpl(Progress(res.rowsInFirstColumn(), res.bytes()));
pre_reader->fillMissingColumns(res, ordered_names);
/// Вычислим выражение в PREWHERE.
@ -204,8 +221,8 @@ protected:
size_t pre_bytes = res.bytes();
/** Если фильтр - константа (например, написано PREWHERE 1),
* то либо вернём пустой блок, либо вернём блок без изменений.
*/
* то либо вернём пустой блок, либо вернём блок без изменений.
*/
if (ColumnConstUInt8 * column_const = typeid_cast<ColumnConstUInt8 *>(&*column))
{
if (!column_const->getData())
@ -295,7 +312,7 @@ protected:
else
throw Exception("Illegal type " + column->getName() + " of column for filter. Must be ColumnUInt8 or ColumnConstUInt8.", ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
reader->fillMissingColumns(res, ordered_names);
reader->fillMissingColumnsAndReorder(res, ordered_names);
}
while (!remaining_mark_ranges.empty() && !res && !isCancelled());
}
@ -315,7 +332,7 @@ protected:
remaining_mark_ranges.pop_back();
}
progressImpl(Progress(res.rows(), res.bytes()));
progressImpl(Progress(res.rowsInFirstColumn(), res.bytes()));
reader->fillMissingColumns(res, ordered_names);
}
@ -356,8 +373,8 @@ private:
Logger * log;
/// requested column names in specific order as expected by other stages
const Names ordered_names;
/// column names in specific order as expected by other stages
Names ordered_names;
};
}

View File

@ -61,7 +61,8 @@ public:
/** Если столбцов нет в блоке, добавляет их, если есть - добавляет прочитанные значения к ним в конец.
* Не добавляет столбцы, для которых нет файлов. Чтобы их добавить, нужно вызвать fillMissingColumns.
* В блоке должно быть либо ни одного столбца из columns, либо все, для которых есть файлы. */
* В блоке должно быть либо ни одного столбца из columns, либо все, для которых есть файлы.
*/
void readRange(size_t from_mark, size_t to_mark, Block & res)
{
try
@ -128,8 +129,7 @@ public:
}
catch (const Exception & e)
{
if (e.code() != ErrorCodes::ALL_REQUESTED_COLUMNS_ARE_MISSING
&& e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
{
storage.reportBrokenPart(part_name);
}
@ -187,111 +187,22 @@ public:
added_column = &columns.front();
}
/// Заполняет столбцы, которых нет в блоке, значениями по умолчанию.
/** Добавляет в блок недостающие столбцы из ordered_names, состоящие из значений по-умолчанию.
* Недостающие столбцы добавляются в позиции, такие же как в ordered_names.
* Если был добавлен хотя бы один столбец - то все столбцы в блоке переупорядочиваются как в ordered_names.
*/
void fillMissingColumns(Block & res, const Names & ordered_names)
{
try
{
/** Для недостающих столбцов из вложенной структуры нужно создавать не столбец пустых массивов, а столбец массивов
* правильных длин.
* TODO: Если для какой-то вложенной структуры были запрошены только отсутствующие столбцы, для них вернутся пустые
* массивы, даже если в куске есть смещения для этой вложенной структуры. Это можно исправить.
*/
fillMissingColumnsImpl(res, ordered_names, false);
}
/// Сначала запомним столбцы смещений для всех массивов в блоке.
OffsetColumns offset_columns;
for (size_t i = 0; i < res.columns(); ++i)
{
const ColumnWithNameAndType & column = res.getByPosition(i);
if (const ColumnArray * array = typeid_cast<const ColumnArray *>(&*column.column))
{
String offsets_name = DataTypeNested::extractNestedTableName(column.name);
offset_columns[offsets_name] = array->getOffsetsColumn();
}
}
auto should_evaluate_defaults = false;
auto should_sort = false;
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
{
/// insert default values only for columns without default expressions
if (!res.has(it->name))
{
should_sort = true;
if (storage.column_defaults.count(it->name) != 0)
{
should_evaluate_defaults = true;
continue;
}
ColumnWithNameAndType column;
column.name = it->name;
column.type = it->type;
String offsets_name = DataTypeNested::extractNestedTableName(column.name);
if (offset_columns.count(offsets_name))
{
ColumnPtr offsets_column = offset_columns[offsets_name];
DataTypePtr nested_type = typeid_cast<DataTypeArray &>(*column.type).getNestedType();
size_t nested_rows = offsets_column->empty() ? 0
: typeid_cast<ColumnUInt64 &>(*offsets_column).getData().back();
ColumnPtr nested_column = dynamic_cast<IColumnConst &>(*nested_type->createConstColumn(
nested_rows, nested_type->getDefault())).convertToFullColumn();
column.column = new ColumnArray(nested_column, offsets_column);
}
else
{
/** Нужно превратить константный столбец в полноценный, так как в части блоков (из других кусков),
* он может быть полноценным (а то интерпретатор может посчитать, что он константный везде).
*/
column.column = dynamic_cast<IColumnConst &>(*column.type->createConstColumn(
res.rows(), column.type->getDefault())).convertToFullColumn();
}
res.insert(column);
}
}
/// evaluate defaulted columns if necessary
if (should_evaluate_defaults)
evaluateMissingDefaults(res, columns, storage.column_defaults, storage.context);
/// remove added column to ensure same content among all blocks
if (added_column)
{
res.erase(0);
streams.erase(added_column->name);
columns.erase(std::begin(columns));
added_column = nullptr;
}
/// sort columns to ensure consistent order among all blocks
if (should_sort)
{
Block ordered_block;
for (const auto & name : ordered_names)
if (res.has(name))
ordered_block.insert(res.getByName(name));
if (res.columns() != ordered_block.columns())
throw Exception{
"Ordered block has different columns than original one:\n" +
ordered_block.dumpNames() + "\nvs.\n" + res.dumpNames(),
ErrorCodes::LOGICAL_ERROR
};
std::swap(res, ordered_block);
}
}
catch (const Exception & e)
{
/// Более хорошая диагностика.
throw Exception(e.message() + '\n' + e.getStackTrace().toString()
+ "\n(while reading from part " + path + ")", e.code());
}
/** То же самое, но всегда переупорядочивает столбцы в блоке, как в ordered_names
* (даже если не было недостающих столбцов).
*/
void fillMissingColumnsAndReorder(Block & res, const Names & ordered_names)
{
fillMissingColumnsImpl(res, ordered_names, true);
}
private:
@ -523,6 +434,111 @@ private:
}
}
}
void fillMissingColumnsImpl(Block & res, const Names & ordered_names, bool always_reorder)
{
try
{
/** Для недостающих столбцов из вложенной структуры нужно создавать не столбец пустых массивов, а столбец массивов
* правильных длин.
* TODO: Если для какой-то вложенной структуры были запрошены только отсутствующие столбцы, для них вернутся пустые
* массивы, даже если в куске есть смещения для этой вложенной структуры. Это можно исправить.
*/
/// Сначала запомним столбцы смещений для всех массивов в блоке.
OffsetColumns offset_columns;
for (size_t i = 0; i < res.columns(); ++i)
{
const ColumnWithNameAndType & column = res.getByPosition(i);
if (const ColumnArray * array = typeid_cast<const ColumnArray *>(&*column.column))
{
String offsets_name = DataTypeNested::extractNestedTableName(column.name);
offset_columns[offsets_name] = array->getOffsetsColumn();
}
}
auto should_evaluate_defaults = false;
auto should_sort = always_reorder;
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
{
/// insert default values only for columns without default expressions
if (!res.has(it->name))
{
should_sort = true;
if (storage.column_defaults.count(it->name) != 0)
{
should_evaluate_defaults = true;
continue;
}
ColumnWithNameAndType column;
column.name = it->name;
column.type = it->type;
String offsets_name = DataTypeNested::extractNestedTableName(column.name);
if (offset_columns.count(offsets_name))
{
ColumnPtr offsets_column = offset_columns[offsets_name];
DataTypePtr nested_type = typeid_cast<DataTypeArray &>(*column.type).getNestedType();
size_t nested_rows = offsets_column->empty() ? 0
: typeid_cast<ColumnUInt64 &>(*offsets_column).getData().back();
ColumnPtr nested_column = dynamic_cast<IColumnConst &>(*nested_type->createConstColumn(
nested_rows, nested_type->getDefault())).convertToFullColumn();
column.column = new ColumnArray(nested_column, offsets_column);
}
else
{
/** Нужно превратить константный столбец в полноценный, так как в части блоков (из других кусков),
* он может быть полноценным (а то интерпретатор может посчитать, что он константный везде).
*/
column.column = dynamic_cast<IColumnConst &>(*column.type->createConstColumn(
res.rows(), column.type->getDefault())).convertToFullColumn();
}
res.insert(column);
}
}
/// evaluate defaulted columns if necessary
if (should_evaluate_defaults)
evaluateMissingDefaults(res, columns, storage.column_defaults, storage.context);
/// remove added column to ensure same content among all blocks
if (added_column)
{
res.erase(0);
streams.erase(added_column->name);
columns.erase(std::begin(columns));
added_column = nullptr;
}
/// sort columns to ensure consistent order among all blocks
if (should_sort)
{
Block ordered_block;
for (const auto & name : ordered_names)
if (res.has(name))
ordered_block.insert(res.getByName(name));
if (res.columns() != ordered_block.columns())
throw Exception{
"Ordered block has different number of columns than original one:\n" +
ordered_block.dumpNames() + "\nvs.\n" + res.dumpNames(),
ErrorCodes::LOGICAL_ERROR};
std::swap(res, ordered_block);
}
}
catch (const Exception & e)
{
/// Более хорошая диагностика.
throw Exception(e.message() + '\n' + e.getStackTrace().toString()
+ "\n(while reading from part " + path + ")", e.code());
}
}
};
}

View File

@ -266,7 +266,7 @@ bool TabSeparatedRowInputStream::parseRowAndPrintDiagnosticInfo(
if (exception)
{
if (data_types[i]->getName() == "DateTime")
out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss format.\n";
out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n";
else if (data_types[i]->getName() == "Date")
out << "ERROR: Date must be in YYYY-MM-DD format.\n";
else
@ -285,7 +285,7 @@ bool TabSeparatedRowInputStream::parseRowAndPrintDiagnosticInfo(
out << "\n";
if (data_types[i]->getName() == "DateTime")
out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss format.\n";
out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n";
else if (data_types[i]->getName() == "Date")
out << "ERROR: Date must be in YYYY-MM-DD format.\n";

View File

@ -69,7 +69,7 @@ void readString(String & s, ReadBuffer & buf)
*
* Использует SSE2, что даёт прирост скорости примерно в 1.7 раза (по сравнению с тривиальным циклом)
* при парсинге типичного tab-separated файла со строками.
* Можно было бы использовать SSE4.2, но он пока поддерживается не на всех наших серверах.
* Можно было бы использовать SSE4.2, но он на момент написания кода поддерживался не на всех наших серверах (сейчас уже поддерживается везде).
* При парсинге файла с короткими строками, падения производительности нет.
*/
static inline const char * find_first_tab_lf_or_backslash(const char * begin, const char * end)
@ -232,6 +232,44 @@ void readBackQuotedString(String & s, ReadBuffer & buf)
}
void readDateTimeTextFallback(time_t & datetime, ReadBuffer & buf)
{
char s[19];
size_t size = buf.read(s, 10);
if (10 != size)
{
s[size] = 0;
throw Exception(std::string("Cannot parse datetime ") + s, ErrorCodes::CANNOT_PARSE_DATETIME);
}
if (s[4] < '0' || s[4] > '9')
{
size_t size = buf.read(&s[10], 9);
if (9 != size)
{
s[10 + size] = 0;
throw Exception(std::string("Cannot parse datetime ") + s, ErrorCodes::CANNOT_PARSE_DATETIME);
}
UInt16 year = (s[0] - '0') * 1000 + (s[1] - '0') * 100 + (s[2] - '0') * 10 + (s[3] - '0');
UInt8 month = (s[5] - '0') * 10 + (s[6] - '0');
UInt8 day = (s[8] - '0') * 10 + (s[9] - '0');
UInt8 hour = (s[11] - '0') * 10 + (s[12] - '0');
UInt8 minute = (s[14] - '0') * 10 + (s[15] - '0');
UInt8 second = (s[17] - '0') * 10 + (s[18] - '0');
if (unlikely(year == 0))
datetime = 0;
else
datetime = DateLUT::instance().makeDateTime(year, month, day, hour, minute, second);
}
else
datetime = parse<time_t>(s, 10);
}
void readException(Exception & e, ReadBuffer & buf, const String & additional_message)
{
int code = 0;
@ -239,7 +277,7 @@ void readException(Exception & e, ReadBuffer & buf, const String & additional_me
String message;
String stack_trace;
bool has_nested = false;
readBinary(code, buf);
readBinary(name, buf);
readBinary(message, buf);

View File

@ -492,37 +492,51 @@ Context & Context::getGlobalContext()
const Dictionaries & Context::getDictionaries() const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
tryCreateDictionaries();
return *shared->dictionaries;
return getDictionariesImpl(false);
}
const ExternalDictionaries & Context::getExternalDictionaries() const
{
return getExternalDictionariesImpl(false);
}
const Dictionaries & Context::getDictionariesImpl(const bool throw_on_error) const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
tryCreateExternalDictionaries();
return *shared->external_dictionaries;
}
void Context::tryCreateDictionaries(const bool throw_on_error) const
{
if (!shared->dictionaries)
shared->dictionaries = new Dictionaries{throw_on_error};
return *shared->dictionaries;
}
void Context::tryCreateExternalDictionaries(const bool throw_on_error) const
const ExternalDictionaries & Context::getExternalDictionariesImpl(const bool throw_on_error) const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (!shared->external_dictionaries)
{
if (!this->global_context)
throw Exception("Logical error: there is no global context", ErrorCodes::LOGICAL_ERROR);
shared->external_dictionaries = new ExternalDictionaries{*this->global_context, throw_on_error};
}
return *shared->external_dictionaries;
}
void Context::tryCreateDictionaries() const
{
static_cast<void>(getDictionariesImpl(true));
}
void Context::tryCreateExternalDictionaries() const
{
static_cast<void>(getExternalDictionariesImpl(true));
}

View File

@ -25,6 +25,7 @@
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/DataTypes/DataTypeFixedString.h>
namespace DB
@ -293,13 +294,31 @@ InterpreterCreateQuery::ColumnsAndDefaults InterpreterCreateQuery::parseColumns(
* 2. conversion of expression (1) to explicitly-specified type alias as column name */
if (col_decl.type)
{
const auto tmp_column_name = col_decl.name + "_tmp";
const auto & final_column_name = col_decl.name;
const auto conversion_function_name = "to" + columns.back().type->getName();
const auto tmp_column_name = final_column_name + "_tmp";
const auto data_type_ptr = columns.back().type.get();
default_expr_list->children.emplace_back(setAlias(
makeASTFunction(conversion_function_name, ASTPtr{new ASTIdentifier{{}, tmp_column_name}}),
final_column_name));
/// specific code for different data types, e.g. toFixedString(col, N) for DataTypeFixedString
if (const auto fixed_string = typeid_cast<const DataTypeFixedString *>(data_type_ptr))
{
const auto conversion_function_name = "toFixedString";
default_expr_list->children.emplace_back(setAlias(
makeASTFunction(
conversion_function_name,
ASTPtr{new ASTIdentifier{{}, tmp_column_name}},
ASTPtr{new ASTLiteral{{}, fixed_string->getN()}}),
final_column_name));
}
else
{
/// @todo fix for parametric types, results in broken code, i.e. toArray(ElementType)(col)
const auto conversion_function_name = "to" + data_type_ptr->getName();
default_expr_list->children.emplace_back(setAlias(
makeASTFunction(conversion_function_name, ASTPtr{new ASTIdentifier{{}, tmp_column_name}}),
final_column_name));
}
default_expr_list->children.emplace_back(setAlias(col_decl.default_expression->clone(), tmp_column_name));
}

View File

@ -8,6 +8,7 @@
#include <Yandex/ErrorHandlers.h>
#include <Yandex/Revision.h>
#include <statdaemons/ConfigProcessor.h>
#include <statdaemons/ext/scope_guard.hpp>
#include <memory>
#include <DB/Interpreters/loadMetadata.h>
@ -536,6 +537,25 @@ int Server::main(const std::vector<std::string> & args)
global_context->setCurrentDatabase(config().getString("default_database", "default"));
SCOPE_EXIT(
LOG_DEBUG(log, "Closed all connections.");
/** Попросим завершить фоновую работу у всех движков таблиц.
* Это важно делать заранее, не в деструкторе Context-а, так как
* движки таблиц могут при уничтожении всё ещё пользоваться Context-ом.
*/
LOG_INFO(log, "Shutting down storages.");
global_context->shutdown();
LOG_DEBUG(log, "Shutted down storages.");
/** Явно уничтожаем контекст - это удобнее, чем в деструкторе Server-а, так как ещё доступен логгер.
* В этот момент никто больше не должен владеть shared-частью контекста.
*/
global_context.reset();
LOG_DEBUG(log, "Destroyed global context.");
);
{
const auto profile_events_transmitter = config().getBool("use_graphite", true)
? std::make_unique<ProfileEventsTransmitter>()
@ -609,54 +629,39 @@ int Server::main(const std::vector<std::string> & args)
if (olap_http_server)
olap_http_server->start();
LOG_INFO(log, "Ready for connections.");
SCOPE_EXIT(
LOG_DEBUG(log, "Received termination signal. Waiting for current connections to close.");
users_config_reloader.reset();
is_cancelled = true;
http_server.stop();
tcp_server.stop();
if (use_olap_server)
olap_http_server->stop();
);
/// try to load dictionaries immediately, throw on error and die
try
{
if (!config().getBool("dictionaries_lazy_load", true))
{
global_context->tryCreateDictionaries(true);
global_context->tryCreateExternalDictionaries(true);
global_context->tryCreateDictionaries();
global_context->tryCreateExternalDictionaries();
}
LOG_INFO(log, "Ready for connections.");
waitForTerminationRequest();
}
catch (...)
{
LOG_ERROR(log, "Caught exception while loading dictionaries.");
tryLogCurrentException(log);
throw;
}
LOG_DEBUG(log, "Received termination signal. Waiting for current connections to close.");
users_config_reloader.reset();
is_cancelled = true;
http_server.stop();
tcp_server.stop();
if (use_olap_server)
olap_http_server->stop();
}
LOG_DEBUG(log, "Closed all connections.");
/** Попросим завершить фоновую работу у всех движков таблиц.
* Это важно делать заранее, не в деструкторе Context-а, так как
* движки таблиц могут при уничтожении всё ещё пользоваться Context-ом.
*/
LOG_INFO(log, "Shutting down storages.");
global_context->shutdown();
LOG_DEBUG(log, "Shutted down storages.");
/** Явно уничтожаем контекст - это удобнее, чем в деструкторе Server-а, так как ещё доступен логгер.
* В этот момент никто больше не должен владеть shared-частью контекста.
*/
global_context.reset();
LOG_DEBUG(log, "Destroyed global context.");
return Application::EXIT_OK;
}

View File

@ -3,6 +3,7 @@
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypeFixedString.h>
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <DB/Parsers/ASTIdentifier.h>
@ -212,13 +213,31 @@ namespace DB
{
if (command.data_type)
{
const auto & column_name = command.column_name;
const auto tmp_column_name = column_name + "_tmp";
const auto conversion_function_name = "to" + command.data_type->getName();
const auto & final_column_name = command.column_name;
const auto tmp_column_name = final_column_name + "_tmp";
const auto data_type_ptr = command.data_type.get();
default_expr_list->children.emplace_back(setAlias(
makeASTFunction(conversion_function_name, ASTPtr{new ASTIdentifier{{}, tmp_column_name}}),
column_name));
/// specific code for different data types, e.g. toFixedString(col, N) for DataTypeFixedString
if (const auto fixed_string = typeid_cast<const DataTypeFixedString *>(data_type_ptr))
{
const auto conversion_function_name = "toFixedString";
default_expr_list->children.emplace_back(setAlias(
makeASTFunction(
conversion_function_name,
ASTPtr{new ASTIdentifier{{}, tmp_column_name}},
ASTPtr{new ASTLiteral{{}, fixed_string->getN()}}),
final_column_name));
}
else
{
/// @todo fix for parametric types, results in broken codem, i.e. toArray(ElementType)(col)
const auto conversion_function_name = "to" + data_type_ptr->getName();
default_expr_list->children.emplace_back(setAlias(
makeASTFunction(conversion_function_name, ASTPtr{new ASTIdentifier{{}, tmp_column_name}}),
final_column_name));
}
default_expr_list->children.emplace_back(setAlias(command.default_expression->clone(), tmp_column_name));

View File

@ -13,6 +13,7 @@
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/DataTypes/DataTypeDate.h>
#include <DB/DataTypes/DataTypeFixedString.h>
#include <DB/Common/localBackup.h>
#include <DB/Functions/FunctionFactory.h>
@ -466,7 +467,8 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
}
else
{
String new_type_name = new_types[column.name]->getName();
const auto new_type = new_types[column.name].get();
const String new_type_name = new_type->getName();
if (new_type_name != column.type->getName() &&
(!part || part->hasColumnFiles(column.name)))
@ -478,13 +480,31 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
out_expression->addInput(ColumnWithNameAndType(nullptr, column.type, column.name));
const FunctionPtr & function = FunctionFactory::instance().get("to" + new_type_name, context);
Names out_names;
out_expression->add(ExpressionAction::applyFunction(function, Names(1, column.name)), out_names);
if (const auto fixed_string = typeid_cast<const DataTypeFixedString *>(new_type))
{
const auto width = fixed_string->getN();
const auto string_width_column = toString(width);
out_expression->addInput({ new ColumnConstUInt64{1, width}, new DataTypeUInt64, string_width_column });
const auto function = FunctionFactory::instance().get("toFixedString", context);
out_expression->add(ExpressionAction::applyFunction(function, Names{
column.name, string_width_column
}), out_names);
out_expression->add(ExpressionAction::removeColumn(string_width_column));
}
else
{
const FunctionPtr & function = FunctionFactory::instance().get("to" + new_type_name, context);
out_expression->add(ExpressionAction::applyFunction(function, Names{column.name}), out_names);
}
out_expression->add(ExpressionAction::removeColumn(column.name));
String escaped_expr = escapeForFileName(out_names[0]);
String escaped_column = escapeForFileName(column.name);
const String escaped_expr = escapeForFileName(out_names[0]);
const String escaped_column = escapeForFileName(column.name);
out_rename_map[escaped_expr + ".bin"] = escaped_column + ".bin";
out_rename_map[escaped_expr + ".mrk"] = escaped_column + ".mrk";
}

View File

@ -227,12 +227,11 @@ class StorageChunkMerger::MergeTask
{
public:
MergeTask(const StorageChunkMerger & chunk_merger_, DB::Context & context_, Logger * log_)
:
shutdown_called(false),
chunk_merger(chunk_merger_),
context(context_),
log(log_),
merging(false)
: shutdown_called(false),
chunk_merger(chunk_merger_),
context(context_),
log(log_),
merging(false)
{
}
@ -551,6 +550,7 @@ bool StorageChunkMerger::MergeTask::mergeChunks(const Storages & chunks)
if (shutdown_called)
{
LOG_INFO(log, "Shutdown requested while merging chunks.");
output->writeSuffix();
new_storage.removeReference(); /// После этого временные данные удалятся.
return false;
}

View File

@ -164,7 +164,21 @@ StorageChunks::StorageChunks(
if (!attach)
reference_counter.add(1, true);
loadIndex();
_table_column_name = "_table" + VirtualColumnUtils::chooseSuffix(getColumnsList(), "_table");
try
{
loadIndex();
}
catch (Exception & e)
{
if (e.code() != ErrorCodes::SIZES_OF_MARKS_FILES_ARE_INCONSISTENT)
throw;
e.addMessage("Table " + name_ + " is broken and loaded as empty.");
tryLogCurrentException(__PRETTY_FUNCTION__);
return;
}
/// Создадим все таблицы типа ChunkRef. Они должны располагаться в той же БД.
{
@ -180,8 +194,6 @@ StorageChunks::StorageChunks(
context.addTable(database_name, it->first, StorageChunkRef::create(it->first, context, database_name, name, true));
}
}
_table_column_name = "_table" + VirtualColumnUtils::chooseSuffix(getColumnsList(), "_table");
}
NameAndTypePair StorageChunks::getColumn(const String & column_name) const

View File

@ -103,13 +103,25 @@ public:
addStream(column.name, *column.type);
}
~LogBlockOutputStream() { writeSuffix(); }
~LogBlockOutputStream()
{
try
{
writeSuffix();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void write(const Block & block);
void writeSuffix();
private:
StorageLog & storage;
Poco::ScopedWriteRWLock lock;
bool done = false;
struct Stream
{
@ -362,6 +374,10 @@ void LogBlockOutputStream::write(const Block & block)
void LogBlockOutputStream::writeSuffix()
{
if (done)
return;
done = true;
/// Заканчиваем запись.
marks_stream.next();

View File

@ -86,13 +86,22 @@ public:
~TinyLogBlockOutputStream()
{
writeSuffix();
try
{
writeSuffix();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void write(const Block & block);
void writeSuffix();
private:
StorageTinyLog & storage;
bool done = false;
struct Stream
{
@ -349,6 +358,10 @@ void TinyLogBlockOutputStream::writeData(const String & name, const IDataType &
void TinyLogBlockOutputStream::writeSuffix()
{
if (done)
return;
done = true;
/// Заканчиваем запись.
for (FileStreams::iterator it = streams.begin(); it != streams.end(); ++it)
it->second->finalize();

View File

@ -0,0 +1 @@
SELECT min(ts = toUInt32(toDateTime(toString(ts)))) FROM (SELECT 1000000000 + 1234 * number AS ts FROM system.numbers LIMIT 1000000);

View File

@ -0,0 +1,2 @@
2015-01-01 hello world
2015-01-01 hello1 xxx world1

View File

@ -0,0 +1,11 @@
DROP TABLE IF EXISTS test.prewhere;
CREATE TABLE test.prewhere (d Date, a String, b String) ENGINE = MergeTree(d, d, 8192);
INSERT INTO test.prewhere VALUES ('2015-01-01', 'hello', 'world');
ALTER TABLE test.prewhere ADD COLUMN a1 String AFTER a;
INSERT INTO test.prewhere VALUES ('2015-01-01', 'hello1', 'xxx', 'world1');
SELECT d, a, a1, b FROM test.prewhere PREWHERE a LIKE 'hello%' ORDER BY a1;
DROP TABLE test.prewhere;

View File

@ -0,0 +1 @@
2009-02-01 1234567890

View File

@ -0,0 +1,7 @@
DROP TABLE IF EXISTS test.default;
CREATE TABLE test.default (d Date DEFAULT toDate(t), t DateTime) ENGINE = MergeTree(d, t, 8192);
INSERT INTO test.default (t) VALUES ('1234567890');
SELECT toStartOfMonth(d), toUInt32(t) FROM test.default;
DROP TABLE test.default;

View File

@ -0,0 +1 @@
SELECT min(ts = toUInt32(toDateTime(toString(ts)))) FROM (SELECT 1000000000 + 1234 * number AS ts FROM system.numbers LIMIT 1000000);