This commit is contained in:
Evgeniy Gatov 2015-03-23 20:24:39 +03:00
commit 3d95ab4993
8 changed files with 121 additions and 37 deletions

View File

@ -155,6 +155,11 @@ public:
return data.size() * sizeof(data[0]);
}
void insert(const T value)
{
data.push_back(value);
}
int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override
{
return CompareHelper<T>::compare(data[n], static_cast<const Self &>(rhs_).data[m], nan_direction_hint);

View File

@ -253,7 +253,7 @@ public:
if (n > old_size)
{
reserve(n);
std::fill(t_end(), reinterpret_cast<T *>(c_end + n - old_size), value);
std::fill(t_end(), t_end() + n - old_size, value);
}
c_end = c_start + byte_size(n);
}

View File

@ -455,7 +455,10 @@ private:
}
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
cell.expires_at = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)};
else
cell.expires_at = std::chrono::time_point<std::chrono::system_clock>::max();
on_cell_updated(id, cell_idx);
remaining_ids[id] = 1;
@ -477,7 +480,10 @@ private:
setDefaultAttributeValue(attribute, cell_idx);
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
cell.expires_at = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)};
else
cell.expires_at = std::chrono::time_point<std::chrono::system_clock>::max();
on_cell_updated(id, cell_idx);
}

View File

@ -144,8 +144,18 @@ struct DictionaryStructure
const auto null_value_string = config.getString(prefix + "null_value");
Field null_value;
try
{
ReadBufferFromString null_value_buffer{null_value_string};
type->deserializeText(null_value, null_value_buffer);
}
catch (const std::exception & e)
{
throw Exception{
std::string{"Error parsing null_value: "} + e.what(),
ErrorCodes::BAD_ARGUMENTS
};
}
const auto hierarchical = config.getBool(prefix + "hierarchical", false);
const auto injective = config.getBool(prefix + "injective", false);

View File

@ -262,7 +262,7 @@ private:
{
auto & array = *std::get<std::unique_ptr<PODArray<T>>>(attribute.arrays);
if (id >= array.size())
array.resize_fill(id, std::get<T>(attribute.null_values));
array.resize_fill(id + 1, std::get<T>(attribute.null_values));
array[id] = value;
}
@ -290,7 +290,7 @@ private:
{
auto & array = *std::get<std::unique_ptr<PODArray<StringRef>>>(attribute.arrays);
if (id >= array.size())
array.resize_fill(id, std::get<String>(attribute.null_values));
array.resize_fill(id + 1, std::get<String>(attribute.null_values));
const auto & string = value.get<String>();
const auto string_in_arena = attribute.string_arena->insert(string.data(), string.size());
array[id] = StringRef{string_in_arena, string.size()};

View File

@ -6,6 +6,7 @@
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypeDate.h>
#include <DB/DataTypes/DataTypeDateTime.h>
#include <DB/Columns/ColumnString.h>
#include <statdaemons/ext/range.hpp>
#include <mysqlxx/Query.h>
#include <vector>
@ -42,6 +43,13 @@ public:
: entry{entry}, query{this->entry->query(query_str)}, result{query.use()},
sample_block{sample_block}, max_block_size{max_block_size}
{
if (sample_block.columns() != result.getNumFields())
throw Exception{
"mysqlxx::UseQueryResult contains " + toString(result.getNumFields()) + " columns while " +
toString(sample_block.columns()) + " expected",
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH
};
types.reserve(sample_block.columns());
for (const auto idx : ext::range(0, sample_block.columns()))
@ -91,48 +99,82 @@ public:
private:
Block readImpl() override
{
auto row = result.fetch();
if (!row)
return {};
auto block = sample_block.cloneEmpty();
if (block.columns() != result.getNumFields())
throw Exception{
"mysqlxx::UseQueryResult contains " + toString(result.getNumFields()) + " columns while " +
toString(block.columns()) + " expected",
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH
};
/// cache pointers returned by the calls to getByPosition
std::vector<IColumn *> columns(block.columns());
for (const auto i : ext::range(0, columns.size()))
columns[i] = block.getByPosition(i).column.get();
std::size_t rows = 0;
while (auto row = result.fetch())
std::size_t num_rows = 0;
while (row)
{
/// @todo cache pointers returned by the calls to getByPosition
for (const auto idx : ext::range(0, row.size()))
insertValue(block.getByPosition(idx).column, row[idx], types[idx]);
++rows;
if (rows == max_block_size)
break;
{
const auto value = row[idx];
if (!value.isNull())
insertValue(columns[idx], types[idx], value);
else
insertDefaultValue(columns[idx], types[idx]);
}
return rows == 0 ? Block{} : block;
};
++num_rows;
if (num_rows == max_block_size)
break;
static void insertValue(ColumnPtr & column, const mysqlxx::Value & value, const value_type_t type)
row = result.fetch();
}
return block;
}
static void insertValue(IColumn * const column, const value_type_t type, const mysqlxx::Value & value)
{
switch (type)
{
case value_type_t::UInt8: column->insert(static_cast<UInt64>(value)); break;
case value_type_t::UInt16: column->insert(static_cast<UInt64>(value)); break;
case value_type_t::UInt32: column->insert(static_cast<UInt64>(value)); break;
case value_type_t::UInt64: column->insert(static_cast<UInt64>(value)); break;
case value_type_t::Int8: column->insert(static_cast<Int64>(value)); break;
case value_type_t::Int16: column->insert(static_cast<Int64>(value)); break;
case value_type_t::Int32: column->insert(static_cast<Int64>(value)); break;
case value_type_t::Int64: column->insert(static_cast<Int64>(value)); break;
case value_type_t::Float32: column->insert(static_cast<Float64>(value)); break;
case value_type_t::Float64: column->insert(static_cast<Float64>(value)); break;
case value_type_t::String: column->insert(value.getString()); break;
case value_type_t::Date: column->insert(static_cast<UInt64>(UInt16{value.getDate().getDayNum()})); break;
case value_type_t::DateTime: column->insert(static_cast<UInt64>(time_t{value.getDateTime()})); break;
};
case value_type_t::UInt8: static_cast<ColumnUInt8 *>(column)->insert(value.getUInt()); break;
case value_type_t::UInt16: static_cast<ColumnUInt16 *>(column)->insert(value.getUInt()); break;
case value_type_t::UInt32: static_cast<ColumnUInt32 *>(column)->insert(value.getUInt()); break;
case value_type_t::UInt64: static_cast<ColumnUInt64 *>(column)->insert(value.getUInt()); break;
case value_type_t::Int8: static_cast<ColumnInt8 *>(column)->insert(value.getInt()); break;
case value_type_t::Int16: static_cast<ColumnInt16 *>(column)->insert(value.getInt()); break;
case value_type_t::Int32: static_cast<ColumnInt32 *>(column)->insert(value.getInt()); break;
case value_type_t::Int64: static_cast<ColumnInt64 *>(column)->insert(value.getInt()); break;
case value_type_t::Float32: static_cast<ColumnFloat32 *>(column)->insert(value.getDouble()); break;
case value_type_t::Float64: static_cast<ColumnFloat64 *>(column)->insert(value.getDouble()); break;
case value_type_t::String:
{
const auto string = value.getString();
static_cast<ColumnString *>(column)->insertDataWithTerminatingZero(string.data(), string.size() + 1);
break;
}
case value_type_t::Date: static_cast<ColumnUInt16 *>(column)->insert(UInt16{value.getDate().getDayNum()}); break;
case value_type_t::DateTime: static_cast<ColumnUInt32 *>(column)->insert(time_t{value.getDateTime()}); break;
}
}
static void insertDefaultValue(IColumn * const column, const value_type_t type)
{
switch (type)
{
case value_type_t::UInt8: static_cast<ColumnUInt8 *>(column)->insertDefault(); break;
case value_type_t::UInt16: static_cast<ColumnUInt16 *>(column)->insertDefault(); break;
case value_type_t::UInt32: static_cast<ColumnUInt32 *>(column)->insertDefault(); break;
case value_type_t::UInt64: static_cast<ColumnUInt64 *>(column)->insertDefault(); break;
case value_type_t::Int8: static_cast<ColumnInt8 *>(column)->insertDefault(); break;
case value_type_t::Int16: static_cast<ColumnInt16 *>(column)->insertDefault(); break;
case value_type_t::Int32: static_cast<ColumnInt32 *>(column)->insertDefault(); break;
case value_type_t::Int64: static_cast<ColumnInt64 *>(column)->insertDefault(); break;
case value_type_t::Float32: static_cast<ColumnFloat32 *>(column)->insertDefault(); break;
case value_type_t::Float64: static_cast<ColumnFloat64 *>(column)->insertDefault(); break;
case value_type_t::String: static_cast<ColumnString *>(column)->insertDefault(); break;
case value_type_t::Date: static_cast<ColumnUInt16 *>(column)->insertDefault(); break;
case value_type_t::DateTime: static_cast<ColumnUInt32 *>(column)->insertDefault(); break;
}
}
mysqlxx::PoolWithFailover::Entry entry;

View File

@ -0,0 +1,18 @@
#pragma once
#include <DB/IO/IBlockInputStream.h>
namespace DB
{
class TypeCheckingBlockInputStream : public IBlockInputStream
{
public:
TypeCheckingBlockInputStream(const BlockInputStreamPtr & source, Block sample_block)
{
}
};
}

View File

@ -132,7 +132,10 @@ void ExternalDictionaries::reloadFromFile(const std::string & config_path)
if (0 != strncmp(key.data(), "dictionary", strlen("dictionary")))
{
LOG_WARNING(log, config_path << ": unknown node in dictionaries file: '" << key + "', 'dictionary'");
if (0 != strncmp(key.data(), "comment", strlen("comment")))
LOG_WARNING(log,
config_path << ": unknown node in dictionaries file: '" << key + "', 'dictionary'");
continue;
}