src/PosixMQHandler.h
branchv_0
changeset 5 5a6828cfad41
parent 4 8a5b86415d80
child 7 68b93b82f1cc
equal deleted inserted replaced
4:8a5b86415d80 5:5a6828cfad41
    29 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h>
    29 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h>
    30 #include <relpipe/reader/handlers/AttributeMetadata.h>
    30 #include <relpipe/reader/handlers/AttributeMetadata.h>
    31 
    31 
    32 #include "PosixMQ.h"
    32 #include "PosixMQ.h"
    33 #include "Configuration.h"
    33 #include "Configuration.h"
       
    34 #include "Hex.h"
    34 
    35 
    35 namespace relpipe {
    36 namespace relpipe {
    36 namespace out {
    37 namespace out {
    37 namespace posixmq {
    38 namespace posixmq {
    38 
    39 
    47 		std::vector<relpipe::reader::handlers::AttributeMetadata> attributes;
    48 		std::vector<relpipe::reader::handlers::AttributeMetadata> attributes;
    48 		relpipe::common::type::Integer attributeIndex = 0;
    49 		relpipe::common::type::Integer attributeIndex = 0;
    49 		std::string currentValue;
    50 		std::string currentValue;
    50 	} currentRelation;
    51 	} currentRelation;
    51 
    52 
    52 	std::string formatText(relpipe::common::type::StringX value) {
       
    53 		// TODO: check valid UTF-8; if invalid, return only ASCII characters
       
    54 		return convertor.to_bytes(value);
       
    55 	}
       
    56 
       
    57 	std::string formatData(relpipe::common::type::StringX value) {
       
    58 		// TODO: decode HEX if type is string; return original octets if type is octet-string (not yet implemented)
       
    59 		return convertor.to_bytes(value);
       
    60 	}
       
    61 
       
    62 public:
    53 public:
    63 
    54 
    64 	PosixMQHandler(Configuration configuration) : configuration(configuration) {
    55 	PosixMQHandler(Configuration configuration) : configuration(configuration) {
    65 		mq.reset(PosixMQ::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose));
    56 		mq.reset(PosixMQ::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose));
    66 	}
    57 	}
    70 	}
    61 	}
    71 
    62 
    72 	void attribute(const relpipe::common::type::StringX& value) override {
    63 	void attribute(const relpipe::common::type::StringX& value) override {
    73 
    64 
    74 		auto attributeName = currentRelation.attributes[currentRelation.attributeIndex].getAttributeName();
    65 		auto attributeName = currentRelation.attributes[currentRelation.attributeIndex].getAttributeName();
    75 		if (attributeName == L"text") currentRelation.currentValue = formatText(value);
    66 		if (attributeName == L"text") currentRelation.currentValue = convertor.to_bytes(value);
    76 		else if (attributeName == L"data") currentRelation.currentValue = formatData(value);
    67 		else if (attributeName == L"data") currentRelation.currentValue = Hex::fromHex(value).str();
    77 
    68 
    78 		currentRelation.attributeIndex++;
    69 		currentRelation.attributeIndex++;
    79 		if (currentRelation.attributeIndex == currentRelation.attributes.size()) {
    70 		if (currentRelation.attributeIndex == currentRelation.attributes.size()) {
    80 			currentRelation.attributeIndex = 0;
    71 			currentRelation.attributeIndex = 0;
    81 			mq->send(currentRelation.currentValue);
    72 			mq->send(currentRelation.currentValue);