src/StreamletAttributeFinder.h
branchv_0
changeset 70 018e2609f5bb
parent 62 a467e8cbd16b
child 96 c34106244a54
equal deleted inserted replaced
69:52f837fbb216 70:018e2609f5bb
    80 		} else {
    80 		} else {
    81 			throw RelpipeWriterException(L"Missing environment variable RELPIPE_IN_FILESYSTEM_STREAMLET_PATH → unable to find streamlet.");
    81 			throw RelpipeWriterException(L"Missing environment variable RELPIPE_IN_FILESYSTEM_STREAMLET_PATH → unable to find streamlet.");
    82 		}
    82 		}
    83 	}
    83 	}
    84 
    84 
       
    85 	void writeAttribute(RelationalWriter* writer, TypeId typeId, SubProcess::Message* m) {
       
    86 		if (m->parameters[1] == L"true") {
       
    87 			if (typeId == TypeId::BOOLEAN) writer->writeAttribute(L"false");
       
    88 			else if (typeId == TypeId::INTEGER)writer->writeAttribute(L"0");
       
    89 			else writer->writeAttribute(L""); // TODO: write acruall null values (when supported)
       
    90 		} else {
       
    91 			writer->writeAttribute(m->parameters[0]);
       
    92 		}
       
    93 	}
       
    94 
    85 protected:
    95 protected:
    86 
    96 
    87 	void startFile(const fs::path& file, const string& fileRaw, bool exists) override {
    97 	void startFile(const fs::path& file, const string& fileRaw, bool exists) override {
    88 		AttributeFinder::startFile(file, fileRaw, exists);
    98 		AttributeFinder::startFile(file, fileRaw, exists);
    89 		if (exists) {
    99 		if (exists) {
    96 
   106 
    97 	virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField & field) override {
   107 	virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField & field) override {
    98 		if (field.group == RequestedField::GROUP_STREAMLET) {
   108 		if (field.group == RequestedField::GROUP_STREAMLET) {
    99 			for (auto metadata : cachedMetadata[field.id]) {
   109 			for (auto metadata : cachedMetadata[field.id]) {
   100 				SubProcess::Message m = subProcesses[field.id]->read();
   110 				SubProcess::Message m = subProcesses[field.id]->read();
   101 				if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE) writer->writeAttribute(m.parameters[0]);
   111 				if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE) writeAttribute(writer, metadata.typeId, &m);
   102 				else throw RelpipeWriterException(L"Protocol violation from exec sub-process while reading: „" + metadata.attributeName + L"“. Expected OUTPUT_ATTRIBUTE but got: " + m.toString());
   112 				else throw RelpipeWriterException(L"Protocol violation from exec sub-process while reading: „" + metadata.attributeName + L"“. Expected OUTPUT_ATTRIBUTE but got: " + m.toString());
   103 			}
   113 			}
   104 
   114 
   105 			SubProcess::Message m = subProcesses[field.id]->read();
   115 			SubProcess::Message m = subProcesses[field.id]->read();
   106 			if (m.code != StreamletMsg::WAITING_FOR_INPUT_ATTRIBUTES) throw RelpipeWriterException(L"Protocol violation from exec sub-process. Expected WAITING_FOR_INPUT_ATTRIBUTES but got: " + m.toString());
   116 			if (m.code != StreamletMsg::WAITING_FOR_INPUT_ATTRIBUTES) throw RelpipeWriterException(L"Protocol violation from exec sub-process. Expected WAITING_FOR_INPUT_ATTRIBUTES but got: " + m.toString());