--- a/src/PosixMQCommand.cpp Wed Apr 20 20:59:42 2022 +0200
+++ b/src/PosixMQCommand.cpp Sat Apr 30 23:19:08 2022 +0200
@@ -55,7 +55,8 @@
{L"data", TypeId::STRING}
}, true);
- for (int i = configuration.messageCount; i > 0; i--) {
+ for (int i = configuration.messageCount; continueProcessing && i > 0; i--) {
+ // TODO: maybe rather call mq_timedreceive() inside and check continueProcessing (to be able to stop even when no messages are comming)
std::string message = mq->receive();
writer->writeAttribute(configuration.queue);
@@ -70,4 +71,4 @@
}
}
-}
\ No newline at end of file
+}
--- a/src/PosixMQCommand.h Wed Apr 20 20:59:42 2022 +0200
+++ b/src/PosixMQCommand.h Sat Apr 30 23:19:08 2022 +0200
@@ -20,6 +20,7 @@
#include <sstream>
#include <vector>
#include <memory>
+#include <atomic>
#include <relpipe/writer/TypeId.h>
@@ -32,11 +33,16 @@
class PosixMQCommand {
private:
std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
+ std::atomic<bool> continueProcessing{true};
public:
virtual ~PosixMQCommand();
void process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration);
+ void finish(int sig) {
+ continueProcessing = false;
+ }
+
};
}
--- a/src/relpipe-in-posixmq.cpp Wed Apr 20 20:59:42 2022 +0200
+++ b/src/relpipe-in-posixmq.cpp Sat Apr 30 23:19:08 2022 +0200
@@ -15,6 +15,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <cstdlib>
+#include <csignal>
#include <vector>
#include <memory>
#include <regex>
@@ -37,6 +38,12 @@
using namespace relpipe::writer;
using namespace relpipe::in::posixmq;
+static std::shared_ptr<PosixMQCommand> command = nullptr;
+
+void finish(int sig) {
+ if (command) command->finish(sig);
+}
+
int main(int argc, char** argv) {
setlocale(LC_ALL, "");
CLI::untieStdIO();
@@ -45,11 +52,14 @@
int resultCode = CLI::EXIT_CODE_UNEXPECTED_ERROR;
try {
+ signal(SIGHUP, finish);
+ signal(SIGINT, finish);
CLIParser cliParser;
Configuration configuration = cliParser.parse(cli.arguments());
- PosixMQCommand command;
+ command.reset(new PosixMQCommand());
std::shared_ptr<RelationalWriter> writer(Factory::create(std::cout));
- command.process(writer, configuration);
+ writer->setBufferingMode(BufferingMode::ENVIRONMENT, BufferingMode::RECORD);
+ command->process(writer, configuration);
resultCode = CLI::EXIT_CODE_SUCCESS;
} catch (RelpipeCLIException e) {
fwprintf(stderr, L"Caught CLI exception: %ls\n", e.getMessage().c_str());