src/HTTPHandler.h
author František Kučera <franta-hg@frantovo.cz>
Tue, 22 Mar 2022 22:08:03 +0100
branchv_0
changeset 19 0fc76872a921
parent 18 d8efcefdf906
child 20 cad9f6d421ee
permissions -rw-r--r--
support request ID (for JOINing requests + responses + response headers)

/**
 * Relational pipes
 * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info)
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, version 3 of the License.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#pragma once

#include <memory>
#include <string>
#include <vector>
#include <codecvt>
#include <regex>
#include <locale>
#include <stdexcept>

#include <curl/curl.h>

#include <relpipe/common/type/typedefs.h>
#include <relpipe/reader/TypeId.h>
#include <relpipe/reader/handlers/RelationalReaderStringHandler.h>
#include <relpipe/reader/handlers/AttributeMetadata.h>

#include <relpipe/writer/Factory.h>
#include <relpipe/writer/TypeId.h>

#include <relpipe/cli/RelpipeCLIException.h>

#include "Configuration.h"
#include "HTTPClient.h"

namespace relpipe {
namespace tr {
namespace http {

class HTTPHandler : public relpipe::reader::handlers::RelationalReaderStringHandler {
private:

	class HeaderDefinition {
	public:
		std::wregex url = std::wregex(L".*");
		relpipe::common::type::StringX name;
		relpipe::common::type::StringX value;

		bool matches(const relpipe::common::type::StringX& url) const {
			return std::regex_match(url, this->url);
		}
	};

	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
	shared_ptr<relpipe::writer::RelationalWriter> relationalWriter;
	Configuration configuration;
	relpipe::common::type::StringX currentRelationName;
	std::vector<relpipe::reader::handlers::AttributeMetadata> currentReaderMetadata;
	std::vector<relpipe::writer::AttributeMetadata> currentWriterMetadata;
	HeaderDefinition requestHeader;
	relpipe::common::type::StringX requestId;
	HTTPClient::Request request;
	std::vector<HeaderDefinition> requestHeaders;
	std::vector<relpipe::common::type::StringX> responseHeaders;
	size_t currentAttributeIndex = 0;
	size_t currentRecordNumber = 1;

	void writeHeaders() {
		if (responseHeaders.size()) {
			relationalWriter->startRelation(L"header",{
				{L"request", relpipe::writer::TypeId::STRING},
				{L"url", relpipe::writer::TypeId::STRING},
				{L"name", relpipe::writer::TypeId::STRING},
				{L"value", relpipe::writer::TypeId::STRING},
			}, true);

			for (auto s : responseHeaders) relationalWriter->writeAttribute(s);

			responseHeaders.clear();
		}
	}

	HTTPClient::Method parseMethod(const relpipe::common::type::StringX& value) {
		if (value.size() == 0) return HTTPClient::Method::GET;
		else if (value == L"GET") return HTTPClient::Method::GET;
		else if (value == L"HEAD") return HTTPClient::Method::HEAD;
		else if (value == L"POST") return HTTPClient::Method::POST;
		else if (value == L"PUT") return HTTPClient::Method::PUT;
		else if (value == L"DELETE") return HTTPClient::Method::DELETE;
		else if (value == L"PATCH") return HTTPClient::Method::PATCH;
		else throw std::invalid_argument("Unsupported HTTP method: " + convertor.to_bytes(value));
	}

	relpipe::common::type::StringX getHeaderAttributePrefix() {
		// might be configurable - parametrized
		return L"header.";
	}

	bool isHeaderAttribute(const relpipe::common::type::StringX& attributeName) {
		return attributeName.rfind(getHeaderAttributePrefix(), 0) == 0;
	}

	relpipe::common::type::StringX fetchHeaderName(const relpipe::common::type::StringX& attributeName) {
		return attributeName.substr(getHeaderAttributePrefix().size());
	}

	void appendRequestHeader(const relpipe::common::type::StringX& name, const relpipe::common::type::StringX& value) {
		request.headers.push_back(convertor.to_bytes(name));
		request.headers.push_back(convertor.to_bytes(value));
	}

public:

	HTTPHandler(shared_ptr<relpipe::writer::RelationalWriter> relationalWriter, Configuration configuration) : relationalWriter(relationalWriter), configuration(configuration) {
	}

	virtual ~HTTPHandler() {
	}

	void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
		writeHeaders(); // from previous relation

		currentRelationName = name;
		currentReaderMetadata = attributes;
		currentAttributeIndex = 0;

		if (currentRelationName == L"header") {
			// TODO: analyze header attributes
		} else if (currentRelationName == L"request") {
			relationalWriter->startRelation(L"response",{
				// TODO: body in hexadecimal/binary format
				{L"request", relpipe::writer::TypeId::STRING},
				{L"url", relpipe::writer::TypeId::STRING},
				{L"body", relpipe::writer::TypeId::STRING},
				{L"code", relpipe::writer::TypeId::INTEGER},
			}, true);
		}
	}

	void attribute(const relpipe::common::type::StringX& value) override {
		if (currentRelationName == L"header") headerAttribute(value);
		else if (currentRelationName == L"request") requestAttribute(value);
		else throw std::invalid_argument("Unsupported relation: " + convertor.to_bytes(currentRelationName));
	}

private:

	void headerAttribute(const relpipe::common::type::StringX& value) {
		auto attributeName = currentReaderMetadata[currentAttributeIndex].getAttributeName();

		if (attributeName == L"name") requestHeader.name = value;
		else if (attributeName == L"value") requestHeader.value = value;
		else if (attributeName == L"url") requestHeader.url = std::wregex(value.size() ? value : L".*"); // TODO: null instead of empty value (when supported)
		else throw std::invalid_argument("Unsupported attribute in the header relation: " + convertor.to_bytes(attributeName + L" = " + value));

		currentAttributeIndex++;

		if (currentAttributeIndex % currentReaderMetadata.size() == 0) {
			currentAttributeIndex = 0;
			requestHeaders.push_back(requestHeader);
			requestHeader = HeaderDefinition();
		}

	}

	void requestAttribute(const relpipe::common::type::StringX& value) {
		auto attributeName = currentReaderMetadata[currentAttributeIndex].getAttributeName();

		if (attributeName == L"id") requestId = value;
		else if (attributeName == L"url") request.url = convertor.to_bytes(value);
		else if (attributeName == L"method") request.method = parseMethod(value);
		else if (isHeaderAttribute(attributeName)) appendRequestHeader(fetchHeaderName(attributeName), value);
		else throw std::invalid_argument("Unsupported attribute in the header relation: " + convertor.to_bytes(attributeName + L" = " + value));

		currentAttributeIndex++;

		if (currentAttributeIndex % currentReaderMetadata.size() == 0) {
			currentAttributeIndex = 0;
			std::shared_ptr<HTTPClient> http(HTTPClient::open());

			for (const HeaderDefinition& h : requestHeaders) if (h.matches(convertor.from_bytes(request.url))) appendRequestHeader(h.name, h.value);

			std::string body;
			relpipe::common::type::Integer responseCode = -1;

			try {
				HTTPClient::Response response = http->exchange(request);
				responseCode = response.responseCode;
				body = response.body;

				for (size_t i = 0; i < response.headers.size(); i += 2) {
					responseHeaders.push_back(requestId);
					responseHeaders.push_back(convertor.from_bytes(request.url));
					responseHeaders.push_back(convertor.from_bytes(response.headers[i]));
					responseHeaders.push_back(convertor.from_bytes(response.headers[i + 1]));
				}
			} catch (const HTTPClient::Exception& e) {
				body = e.getFullMessage();
				// TODO: move error message into separate attribute?
			}

			relationalWriter->writeAttribute(requestId);
			relationalWriter->writeAttribute(convertor.from_bytes(request.url));
			relationalWriter->writeAttribute(convertor.from_bytes(body));
			relationalWriter->writeAttribute(&responseCode, typeid (responseCode));

			request = HTTPClient::Request();
			requestId.clear();
		}
	}

public:

	void endOfPipe() {
		writeHeaders(); // from last relation
	}

};

}
}
}