parallel processing: optimize flush() + detect another protocol violation v_0
authorFrantišek Kučera <franta-hg@frantovo.cz>
Sun, 26 Jan 2020 21:35:02 +0100
branchv_0
changeset 62 a467e8cbd16b
parent 61 640ba8948d69
child 63 8c6885543e2c
parallel processing: optimize flush() + detect another protocol violation
src/StreamletAttributeFinder.h
src/SubProcess.cpp
--- a/src/StreamletAttributeFinder.h	Sat Jan 25 21:28:37 2020 +0100
+++ b/src/StreamletAttributeFinder.h	Sun Jan 26 21:35:02 2020 +0100
@@ -143,6 +143,7 @@
 						SubProcess::Message m = subProcess->read();
 						if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE_METADATA) metadata.push_back({m.parameters[0], writer->toTypeId(m.parameters[1])});
 						else if (m.code == StreamletMsg::WAITING_FOR_INPUT_ATTRIBUTES) break;
+						else throw RelpipeWriterException(L"Protocol violation from exec sub-process while reading output attribute metadata. Expected OUTPUT_ATTRIBUTE_METADATA or WAITING_FOR_INPUT_ATTRIBUTES but got: " + m.toString());
 					}
 
 					cachedMetadata[field.id] = metadata;
--- a/src/SubProcess.cpp	Sat Jan 25 21:28:37 2020 +0100
+++ b/src/SubProcess.cpp	Sun Jan 26 21:35:02 2020 +0100
@@ -57,7 +57,6 @@
 	void write(string_t s) {
 		subInputWriter << convertor.to_bytes(s).c_str();
 		subInputWriter.put(SEPARATOR);
-		subInputWriter.flush();
 		if (subInputWriter.bad()) throw SubProcess::Exception(L"Unable to write to sub-process.");
 	}
 
@@ -65,6 +64,10 @@
 		write(std::to_wstring(i));
 	}
 
+	void flush() {
+		subInputWriter.flush();
+	}
+
 public:
 
 	/**
@@ -171,6 +174,7 @@
 		write(m.code);
 		write(m.parameters.size());
 		for (auto p : m.parameters) write(p);
+		flush();
 	}
 
 	int wait() {