179 lines
5.7 KiB
C++
179 lines
5.7 KiB
C++
#include "matador/orm/session.hpp"
|
|
|
|
#include "matador/sql/backend_provider.hpp"
|
|
|
|
#include "matador/query/query.hpp"
|
|
|
|
#include <queue>
|
|
#include <stdexcept>
|
|
|
|
namespace matador::orm {
|
|
utils::error make_error(const error_code ec, const std::string &msg) {
|
|
return utils::error(ec, msg);
|
|
}
|
|
|
|
session::session(session_context&& ctx)
|
|
: cache_(ctx.bus, ctx.pool, ctx.cache_size)
|
|
, dialect_(sql::backend_provider::instance().connection_dialect(ctx.pool.info().type))
|
|
, schema_(std::make_unique<object::schema>(dialect_.default_schema_name())) {
|
|
}
|
|
|
|
utils::result<void, utils::error> session::create_schema() const {
|
|
// Step 1: Build dependency graph
|
|
std::unordered_map<std::string, std::vector<std::string> > dependency_graph;
|
|
std::unordered_map<std::string, std::pair<int,object::schema::node_ptr>> in_degree;
|
|
|
|
for (const auto &node: *schema_) {
|
|
for (auto it = node->info().endpoint_begin(); it != node->info().endpoint_end(); ++it) {
|
|
dependency_graph[node->name()].push_back(it->second->node().name());
|
|
|
|
if (const auto dit = in_degree.find(it->second->node().name()); dit == in_degree.end()) {
|
|
in_degree[it->second->node().name()] = std::make_pair(1, it->second->node_ptr());
|
|
} else {
|
|
in_degree[it->second->node().name()].first++;
|
|
}
|
|
}
|
|
|
|
// Ensure the current node exists in the graph representation
|
|
if (in_degree.find(node->name()) == in_degree.end()) {
|
|
in_degree[node->name()] = std::make_pair(0, node);
|
|
}
|
|
}
|
|
|
|
for (const auto &it : dependency_graph) {
|
|
std::cout << "Dependency graph " << it.first << std::endl;
|
|
for (const auto &neighbor: it.second) {
|
|
std::cout << " " << neighbor << std::endl;
|
|
}
|
|
std::cout << std::endl;
|
|
}
|
|
|
|
std::vector<std::string> fk_sql_commands;
|
|
auto c = cache_.pool().acquire();
|
|
for (const auto &node: *schema_) {
|
|
auto ctx = query::query::create()
|
|
.table(node->name(), node->info().definition().columns())
|
|
.compile(*c);
|
|
|
|
for ( const auto& [sql, command] : ctx.additional_commands ) {
|
|
fk_sql_commands.push_back( sql );
|
|
}
|
|
std::cout << ctx.sql << std::endl;
|
|
if (auto result = c->execute(ctx.sql); !result) {
|
|
return utils::failure(result.err());
|
|
}
|
|
}
|
|
|
|
// execute additional commands (e.g. ALTER TABLE ADD FK)
|
|
for (const auto &sql: fk_sql_commands) {
|
|
std::cout << sql << std::endl;
|
|
if (auto result = c->execute(sql); !result) {
|
|
return utils::failure(result.err());
|
|
}
|
|
}
|
|
return utils::ok<void>();
|
|
}
|
|
|
|
utils::result<void, utils::error> session::drop_table(const std::string &table_name) const {
|
|
auto result = query::query::drop()
|
|
.table(table_name)
|
|
.execute(*this);
|
|
if (result.is_error()) {
|
|
return utils::failure(result.err());
|
|
}
|
|
|
|
return utils::ok<void>();
|
|
}
|
|
|
|
utils::result<sql::query_result<sql::record>, utils::error> session::fetch_all(const sql::query_context &q) const {
|
|
auto c = cache_.pool().acquire();
|
|
if (!c.valid()) {
|
|
return utils::failure(make_error(error_code::NoConnectionAvailable, "Failed to acquire connection."));
|
|
}
|
|
auto it = prototypes_.find(q.table.name);
|
|
if (it == prototypes_.end()) {
|
|
auto result = c->describe(q.table.name);
|
|
if (!result) {
|
|
return utils::failure(result.err());
|
|
}
|
|
it = prototypes_.emplace(q.table.name, *result).first;
|
|
}
|
|
// adjust columns from given query
|
|
for (auto &col: q.prototype) {
|
|
if (const auto rit = it->second.find(col.name()); rit != it->second.end()) {
|
|
const_cast<object::attribute_definition &>(col).type(rit->type());
|
|
}
|
|
}
|
|
auto res = fetch(q);
|
|
if (!res) {
|
|
return utils::failure(res.err());
|
|
}
|
|
return utils::ok(sql::query_result<sql::record>{std::move(*res)});
|
|
}
|
|
|
|
utils::result<size_t, utils::error> session::execute(const std::string &sql) const {
|
|
return execute(sql::query_context{sql});
|
|
}
|
|
|
|
std::vector<object::attribute_definition> session::describe_table(const std::string &table_name) const {
|
|
const auto c = cache_.pool().acquire();
|
|
if (!c.valid()) {
|
|
throw std::logic_error("no database connection available");
|
|
}
|
|
return c->describe(table_name).release();
|
|
}
|
|
|
|
bool session::table_exists(const std::string &table_name) const {
|
|
const auto c = cache_.pool().acquire();
|
|
if (!c.valid()) {
|
|
throw std::logic_error("no database connection available");
|
|
}
|
|
return c->exists(dialect_.default_schema_name(), table_name);
|
|
}
|
|
|
|
const class sql::dialect &session::dialect() const {
|
|
return dialect_;
|
|
}
|
|
|
|
void session::dump_schema(std::ostream &os) const {
|
|
schema_->dump(os);
|
|
}
|
|
|
|
utils::result<std::unique_ptr<sql::query_result_impl>, utils::error> session::fetch(const sql::query_context& ctx) const {
|
|
if (const auto result = cache_.acquire(ctx); !result) {
|
|
return utils::failure(result.err());
|
|
} else if (auto fetch_result = result->fetch_internal(); !fetch_result) {
|
|
return utils::failure(fetch_result.err());
|
|
} else {
|
|
return fetch_result;
|
|
}
|
|
}
|
|
|
|
utils::result<size_t, utils::error> session::execute(const sql::query_context& ctx) const {
|
|
if (const auto result = cache_.acquire(ctx); !result) {
|
|
return utils::failure(result.err());
|
|
} else if (auto exec_result = result->execute(); !exec_result) {
|
|
return utils::failure(exec_result.err());
|
|
} else {
|
|
return exec_result;
|
|
}
|
|
}
|
|
|
|
utils::result<sql::statement, utils::error> session::prepare(const sql::query_context& ctx) {
|
|
return cache_.acquire(ctx);
|
|
}
|
|
|
|
std::string session::str(const sql::query_context& ctx) const {
|
|
return ctx.sql;
|
|
}
|
|
|
|
query::fetchable_query session::build_select_query(entity_query_data &&data) {
|
|
return query::query::select(data.columns)
|
|
.from(*data.root_table)
|
|
.join_left(data.joins)
|
|
.where(std::move(data.where_clause))
|
|
.order_by(sql::column{data.root_table, data.pk_column_name})
|
|
.asc();
|
|
}
|
|
}
|