add signal handler + optional stop v_0
authorFrantišek Kučera <franta-hg@frantovo.cz>
Sat, 30 Apr 2022 23:19:08 +0200
branchv_0
changeset 7 12e975f807ed
parent 6 65abb0376a0d
child 8 04550f271623
add signal handler + optional stop
src/PosixMQCommand.cpp
src/PosixMQCommand.h
src/relpipe-in-posixmq.cpp
--- a/src/PosixMQCommand.cpp	Wed Apr 20 20:59:42 2022 +0200
+++ b/src/PosixMQCommand.cpp	Sat Apr 30 23:19:08 2022 +0200
@@ -55,7 +55,8 @@
 		{L"data", TypeId::STRING}
 	}, true);
 
-	for (int i = configuration.messageCount; i > 0; i--) {
+	for (int i = configuration.messageCount; continueProcessing && i > 0; i--) {
+		// TODO: maybe rather call mq_timedreceive() inside and check continueProcessing (to be able to stop even when no messages are comming)
 		std::string message = mq->receive();
 
 		writer->writeAttribute(configuration.queue);
@@ -70,4 +71,4 @@
 
 }
 }
-}
\ No newline at end of file
+}
--- a/src/PosixMQCommand.h	Wed Apr 20 20:59:42 2022 +0200
+++ b/src/PosixMQCommand.h	Sat Apr 30 23:19:08 2022 +0200
@@ -20,6 +20,7 @@
 #include <sstream>
 #include <vector>
 #include <memory>
+#include <atomic>
 
 #include <relpipe/writer/TypeId.h>
 
@@ -32,11 +33,16 @@
 class PosixMQCommand {
 private:
 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
+	std::atomic<bool> continueProcessing{true};
 public:
 	virtual ~PosixMQCommand();
 
 	void process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration);
 
+	void finish(int sig) {
+		continueProcessing = false;
+	}
+
 };
 
 }
--- a/src/relpipe-in-posixmq.cpp	Wed Apr 20 20:59:42 2022 +0200
+++ b/src/relpipe-in-posixmq.cpp	Sat Apr 30 23:19:08 2022 +0200
@@ -15,6 +15,7 @@
  * along with this program. If not, see <http://www.gnu.org/licenses/>.
  */
 #include <cstdlib>
+#include <csignal>
 #include <vector>
 #include <memory>
 #include <regex>
@@ -37,6 +38,12 @@
 using namespace relpipe::writer;
 using namespace relpipe::in::posixmq;
 
+static std::shared_ptr<PosixMQCommand> command = nullptr;
+
+void finish(int sig) {
+	if (command) command->finish(sig);
+}
+
 int main(int argc, char** argv) {
 	setlocale(LC_ALL, "");
 	CLI::untieStdIO();
@@ -45,11 +52,14 @@
 	int resultCode = CLI::EXIT_CODE_UNEXPECTED_ERROR;
 
 	try {
+		signal(SIGHUP, finish);
+		signal(SIGINT, finish);
 		CLIParser cliParser;
 		Configuration configuration = cliParser.parse(cli.arguments());
-		PosixMQCommand command;
+		command.reset(new PosixMQCommand());
 		std::shared_ptr<RelationalWriter> writer(Factory::create(std::cout));
-		command.process(writer, configuration);
+		writer->setBufferingMode(BufferingMode::ENVIRONMENT, BufferingMode::RECORD);
+		command->process(writer, configuration);
 		resultCode = CLI::EXIT_CODE_SUCCESS;
 	} catch (RelpipeCLIException e) {
 		fwprintf(stderr, L"Caught CLI exception: %ls\n", e.getMessage().c_str());