/**
* Relational pipes
* Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <memory>
#include <string>
#include <vector>
#include <codecvt>
#include <regex>
#include <stdexcept>
#include <mutex>
#include <shared_mutex>
#include <iomanip>
#include <uuid/uuid.h>
#include <relpipe/common/type/typedefs.h>
#include <relpipe/reader/TypeId.h>
#include <relpipe/reader/handlers/RelationalReaderStringHandler.h>
#include <relpipe/reader/handlers/AttributeMetadata.h>
#include <relpipe/writer/Factory.h>
#include <relpipe/cli/RelpipeCLIException.h>
#include "Configuration.h"
#include "HTTPServer.h"
#include "Hex.h"
namespace relpipe {
namespace tr {
namespace httpd {
class HttpdHandler : public relpipe::reader::handlers::RelationalReaderStringHandler {
private:
std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encoding than UTF-8
shared_ptr<relpipe::writer::RelationalWriter> relationalWriter;
std::mutex relationalWriterMutex;
Configuration configuration;
std::shared_ptr<HTTPServer> httpServer;
relpipe::common::type::StringX currentRelationName;
std::vector<relpipe::reader::handlers::AttributeMetadata> currentReaderMetadata;
size_t currentAttributeIndex = 0;
size_t currentRecordNumber = 1;
class RequestMatcher {
public:
std::regex method = std::regex(".*");
std::regex url = std::regex(".*");
virtual ~RequestMatcher() = default;
bool matches(const std::string& method, const std::string& url) const {
bool result = true;
result &= std::regex_match(method, this->method);
result &= std::regex_match(url, this->url);
return result;
}
};
class HeaderTemplate {
public:
std::string name;
std::string value;
HeaderTemplate() {
}
HeaderTemplate(std::string name, std::string value) : name(name), value(value) {
}
virtual ~HeaderTemplate() = default;
};
class GlobalHeaderTemplate : public HeaderTemplate, public RequestMatcher {
} headerTemplate;
std::vector<GlobalHeaderTemplate> headerTemplates;
class ResponseTemplate : public RequestMatcher {
public:
std::string body;
uint16_t code = 200;
std::vector<HeaderTemplate> headers;
} responseTemplate;
std::vector<ResponseTemplate> responseTemplates;
std::mutex templatesMutex; // TODO: read-write lock (for responseTemplates and headerTemplates)
class RequestHandler : public HTTPServer::RequestHandler {
private:
shared_ptr<relpipe::writer::RelationalWriter> relationalWriter;
std::mutex* relationalWriterMutex;
std::vector<ResponseTemplate>* responseTemplates;
std::vector<GlobalHeaderTemplate>* headerTemplates;
std::mutex* templatesMutex;
void writeExchange(const HTTPServer::Request& request, const HTTPServer::Response& response) {
std::wstring_convert < codecvt_utf8<wchar_t>> convertor; // TODO: support also other encoding than UTF-8
std::lock_guard<std::mutex> lock(*relationalWriterMutex);
auto exchangeId = convertor.from_bytes(generateExchangeId());
// TODO: multiple modes:
// a) interleaved (current)
// b) write exchanges immediatelly + cache headers and flush them at the end
// TODO: data types
// TODO: support multiple encodings
relationalWriter->startRelation(L"exchange",{
// TODO: timestamp
// TODO: ordinal number?
{L"id", relpipe::writer::TypeId::STRING},
{L"url", relpipe::writer::TypeId::STRING},
{L"method", relpipe::writer::TypeId::STRING},
{L"request_text", relpipe::writer::TypeId::STRING},
{L"request_data", relpipe::writer::TypeId::STRING},
{L"request_size", relpipe::writer::TypeId::INTEGER},
{L"response_text", relpipe::writer::TypeId::STRING},
{L"response_data", relpipe::writer::TypeId::STRING},
{L"response_size", relpipe::writer::TypeId::INTEGER},
{L"response_code", relpipe::writer::TypeId::INTEGER},
}, true);
relpipe::common::type::Integer requestSize = request.body.size();
relpipe::common::type::Integer responseSize = response.body.size();
relationalWriter->writeAttribute(exchangeId);
relationalWriter->writeAttribute(convertor.from_bytes(request.url));
relationalWriter->writeAttribute(convertor.from_bytes(request.method));
relationalWriter->writeAttribute(Hex::toTxt(request.body));
relationalWriter->writeAttribute(Hex::toHex(request.body));
relationalWriter->writeAttribute(&requestSize, typeid (requestSize));
relationalWriter->writeAttribute(Hex::toTxt(response.body));
relationalWriter->writeAttribute(Hex::toHex(response.body));
relationalWriter->writeAttribute(&responseSize, typeid (responseSize));
relationalWriter->writeAttribute(std::to_wstring(response.code));
relationalWriter->startRelation(L"header",{
// TODO: ordinal number?
{L"exchange", relpipe::writer::TypeId::STRING},
{L"url", relpipe::writer::TypeId::STRING},
{L"direction", relpipe::writer::TypeId::STRING},
{L"name", relpipe::writer::TypeId::STRING},
{L"value", relpipe::writer::TypeId::STRING},
}, true);
for (const HTTPServer::AVP& h : request.header) {
relationalWriter->writeAttribute(exchangeId);
relationalWriter->writeAttribute(convertor.from_bytes(request.url));
relationalWriter->writeAttribute(L"request");
relationalWriter->writeAttribute(convertor.from_bytes(h.name));
relationalWriter->writeAttribute(convertor.from_bytes(h.value));
}
for (const HTTPServer::AVP& h : response.header) {
relationalWriter->writeAttribute(exchangeId);
relationalWriter->writeAttribute(convertor.from_bytes(request.url));
relationalWriter->writeAttribute(L"response");
relationalWriter->writeAttribute(convertor.from_bytes(h.name));
relationalWriter->writeAttribute(convertor.from_bytes(h.value));
}
relationalWriter->startRelation(L"cookie",{
// TODO: ordinal number?
{L"exchange", relpipe::writer::TypeId::STRING},
{L"url", relpipe::writer::TypeId::STRING},
{L"direction", relpipe::writer::TypeId::STRING},
{L"name", relpipe::writer::TypeId::STRING},
{L"value", relpipe::writer::TypeId::STRING},
}, true);
for (const HTTPServer::AVP& h : request.cookie) {
relationalWriter->writeAttribute(exchangeId);
relationalWriter->writeAttribute(convertor.from_bytes(request.url));
relationalWriter->writeAttribute(L"request");
relationalWriter->writeAttribute(convertor.from_bytes(h.name));
relationalWriter->writeAttribute(convertor.from_bytes(h.value));
}
for (const HTTPServer::AVP& h : response.cookie) {
relationalWriter->writeAttribute(exchangeId);
relationalWriter->writeAttribute(convertor.from_bytes(request.url));
relationalWriter->writeAttribute(L"response");
relationalWriter->writeAttribute(convertor.from_bytes(h.name));
relationalWriter->writeAttribute(convertor.from_bytes(h.value));
}
relationalWriter->startRelation(L"get_parameter",{
// TODO: ordinal number?
{L"exchange", relpipe::writer::TypeId::STRING},
{L"url", relpipe::writer::TypeId::STRING},
{L"name", relpipe::writer::TypeId::STRING},
{L"value", relpipe::writer::TypeId::STRING},
}, true);
for (const HTTPServer::AVP& h : request.getParameter) {
relationalWriter->writeAttribute(exchangeId);
relationalWriter->writeAttribute(convertor.from_bytes(request.url));
relationalWriter->writeAttribute(convertor.from_bytes(h.name));
relationalWriter->writeAttribute(convertor.from_bytes(h.value));
}
relationalWriter->startRelation(L"post_parameter",{
// TODO: ordinal number?
{L"exchange", relpipe::writer::TypeId::STRING},
{L"url", relpipe::writer::TypeId::STRING},
{L"name", relpipe::writer::TypeId::STRING},
{L"value", relpipe::writer::TypeId::STRING},
}, true);
for (const HTTPServer::AVP& h : request.postParameter) {
relationalWriter->writeAttribute(exchangeId);
relationalWriter->writeAttribute(convertor.from_bytes(request.url));
relationalWriter->writeAttribute(convertor.from_bytes(h.name));
relationalWriter->writeAttribute(convertor.from_bytes(h.value));
}
}
std::string generateExchangeId() {
char buffer[37];
uuid_t uuid;
uuid_generate_random(uuid);
// uuid_generate_time(uuid);
uuid_unparse_lower(uuid, buffer);
return buffer;
}
public:
RequestHandler(shared_ptr<relpipe::writer::RelationalWriter> relationalWriter, std::mutex* relationalWriterMutex, std::vector<ResponseTemplate>* responseTemplates, std::vector<GlobalHeaderTemplate>* headerTemplates, std::mutex* templatesMutex) :
relationalWriter(relationalWriter), relationalWriterMutex(relationalWriterMutex), responseTemplates(responseTemplates), headerTemplates(headerTemplates), templatesMutex(templatesMutex) {
}
virtual ~RequestHandler() = default;
const HTTPServer::Response handle(const HTTPServer::Request& request) override {
HTTPServer::Response response;
response.code = 404;
response.body = "<h1>HTTP 404: Not Found</h1><p>(no response template matched)</p>";
std::lock_guard<std::mutex> lock(*templatesMutex);
for (ResponseTemplate t : *responseTemplates) {
if (t.matches(request.method, request.url)) {
response.code = t.code;
response.body = t.body;
// TODO: replace global header values with request-specific ones instead of appending?
for (const GlobalHeaderTemplate& h : *headerTemplates) if (h.matches(request.method, request.url)) response.header.push_back({h.name, h.value});
for (const HeaderTemplate& h : t.headers) response.header.push_back({h.name, h.value});
break;
}
}
writeExchange(request, response);
return response;
}
};
std::shared_ptr<RequestHandler> requestHandler = std::make_shared<RequestHandler>(relationalWriter, &relationalWriterMutex, &responseTemplates, &headerTemplates, &templatesMutex);
relpipe::common::type::StringX getHeaderAttributePrefix() {
// might be configurable - parametrized
return L"header.";
}
bool isHeaderAttribute(const relpipe::common::type::StringX& attributeName) {
return attributeName.rfind(getHeaderAttributePrefix(), 0) == 0;
}
relpipe::common::type::StringX fetchHeaderName(const relpipe::common::type::StringX& attributeName) {
// TODO: recognize several modes: header.set.*, header.add.*, header.remove.*
return attributeName.substr(getHeaderAttributePrefix().size());
}
void headerTemplateAttribute(const relpipe::common::type::StringX& value) {
auto attributeName = currentReaderMetadata[currentAttributeIndex].getAttributeName();
if (attributeName == L"url") headerTemplate.url = std::regex(value.size() ? convertor.to_bytes(value) : ".*");
else if (attributeName == L"method") headerTemplate.method = std::regex(value.size() ? convertor.to_bytes(value) : ".*");
else if (attributeName == L"name") headerTemplate.name = convertor.to_bytes(value);
else if (attributeName == L"value") headerTemplate.value = convertor.to_bytes(value); // TODO: header encoding?
else throw std::invalid_argument("Unsupported attribute in the request_template relation: " + convertor.to_bytes(attributeName + L" = " + value));
currentAttributeIndex++;
if (currentAttributeIndex % currentReaderMetadata.size() == 0) {
std::lock_guard<std::mutex> lock(templatesMutex);
currentAttributeIndex = 0;
headerTemplates.push_back(headerTemplate);
headerTemplate = GlobalHeaderTemplate();
}
}
void responseTemplateAttribute(const relpipe::common::type::StringX& value) {
auto attributeName = currentReaderMetadata[currentAttributeIndex].getAttributeName();
if (attributeName == L"url") responseTemplate.url = std::regex(value.size() ? convertor.to_bytes(value) : ".*");
else if (attributeName == L"method") responseTemplate.method = std::regex(value.size() ? convertor.to_bytes(value) : ".*");
else if (attributeName == L"code") responseTemplate.code = std::stoi(value);
else if (attributeName == L"text" && value.size()) responseTemplate.body = convertor.to_bytes(value);
else if (attributeName == L"data" && value.size()) responseTemplate.body = Hex::fromHex(value).str();
else if (attributeName == L"text"); // keep empty or value from 'data'
else if (attributeName == L"data"); // keep empty or value from 'text'
else if (isHeaderAttribute(attributeName)) responseTemplate.headers.push_back(HeaderTemplate(convertor.to_bytes(fetchHeaderName(attributeName)), convertor.to_bytes(value))); // TODO: header encoding?
else throw std::invalid_argument("Unsupported attribute in the response_template relation: " + convertor.to_bytes(attributeName + L" = " + value));
currentAttributeIndex++;
if (currentAttributeIndex % currentReaderMetadata.size() == 0) {
std::lock_guard<std::mutex> lock(templatesMutex);
currentAttributeIndex = 0;
responseTemplates.push_back(responseTemplate);
responseTemplate = ResponseTemplate();
}
}
public:
HttpdHandler(shared_ptr<relpipe::writer::RelationalWriter> relationalWriter, Configuration configuration, std::shared_ptr<HTTPServer> httpServer) : relationalWriter(relationalWriter), configuration(configuration), httpServer(httpServer) {
httpServer->setRequestHandler(requestHandler);
}
virtual ~HttpdHandler() {
}
void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
currentRelationName = name;
currentReaderMetadata = attributes;
}
void attribute(const relpipe::common::type::StringX& value) override {
if (currentRelationName == L"header_template") headerTemplateAttribute(value);
else if (currentRelationName == L"response_template") responseTemplateAttribute(value);
else throw std::invalid_argument("Unsupported relation: " + convertor.to_bytes(currentRelationName));
}
void endOfPipe() {
sleep(60); // FIXME: run for configured interval or number of requests or forever (until some stop signal)
}
};
}
}
}