src/StreamletAttributeFinder.h
branchv_0
changeset 35 926eb93c302f
parent 32 bccda5688d71
child 45 f466b4c7d9b1
equal deleted inserted replaced
34:0b9e4af08cc8 35:926eb93c302f
    48 		return SCRIPT_PREFIX + field.name;
    48 		return SCRIPT_PREFIX + field.name;
    49 	}
    49 	}
    50 
    50 
    51 protected:
    51 protected:
    52 
    52 
    53 	virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField& field) override {
    53 	void startFile(const fs::path& file, const string& fileRaw, bool exists) override {
       
    54 		AttributeFinder::startFile(file, fileRaw, exists);
       
    55 		if (exists) {
       
    56 			for (auto subProcess : subProcesses) {
       
    57 				subProcess.second->write({StreamletMsg::INPUT_ATTRIBUTE, L"0", convertor.from_bytes(currentFileRaw), L"false"}); // index, value, isNull
       
    58 				subProcess.second->write({StreamletMsg::WAITING_FOR_OUTPUT_ATTRIBUTES});
       
    59 			}
       
    60 		}
       
    61 	}
       
    62 
       
    63 	virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField & field) override {
    54 		// TODO: paralelize also over records → fork multiple processes and distribute records across them; then collect results (with a lock)
    64 		// TODO: paralelize also over records → fork multiple processes and distribute records across them; then collect results (with a lock)
    55 		if (field.group == RequestedField::GROUP_STREAMLET) {
    65 		if (field.group == RequestedField::GROUP_STREAMLET) {
    56 
       
    57 			subProcesses[field.id]->write({StreamletMsg::INPUT_ATTRIBUTE, L"0", convertor.from_bytes(currentFileRaw), L"false"}); // index, value, isNull
       
    58 			subProcesses[field.id]->write({StreamletMsg::WAITING_FOR_OUTPUT_ATTRIBUTES});
       
    59 
       
    60 			for (auto metadata : cachedMetadata[field.id]) {
    66 			for (auto metadata : cachedMetadata[field.id]) {
    61 				SubProcess::Message m = subProcesses[field.id]->read();
    67 				SubProcess::Message m = subProcesses[field.id]->read();
    62 				if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE) writer->writeAttribute(m.parameters[0]);
    68 				if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE) writer->writeAttribute(m.parameters[0]);
    63 				else throw RelpipeWriterException(L"Protocol violation from exec sub-process while reading: „" + metadata.attributeName + L"“. Expected OUTPUT_ATTRIBUTE but got: " + m.toString());
    69 				else throw RelpipeWriterException(L"Protocol violation from exec sub-process while reading: „" + metadata.attributeName + L"“. Expected OUTPUT_ATTRIBUTE but got: " + m.toString());
    64 			}
    70 			}