Protobuf вызывает ошибку сегментации на ParseFromIstream



Я пытаюсь расширить свои знания в области программирования, и я пытаюсь сделать несколько многопроцессорных программ.



Я хотел бы сделать следующее: на одном хосте выполняется несколько исполняемых файлов. Один из исполняемых файлов отвечает за сканирование файловой системы, другой-за обработку данных и т. д.



Однако некоторые данные должны быть переданы за пределы хоста. Чтобы ограничить такие вещи, как настройки сетевого брандмауэра, я хотел бы иметь один демон (многопоточный), чтобы получите данные через IPC, прежде чем отправлять их дальше на внешний хост, используя еще не определенную реализацию сокета.



После проведения большого количества поисков и исследований наиболее очевидным паттерном для использования является паттерн потребитель / производитель, с многопроцессными производителями (демоны, производящие сообщения) и многопоточным потребителем (получающим данные, предпочтительно через общую память, и отправляющим их на внешний хост).

Я хочу, чтобы мое приложение было в состоянии работать кросс-платформенно как можно больше по возможности. Для этого я использую boost::interprocess:message_queue. Поскольку эта библиотека Boost принимает только двоичные сериализованные объекты, я использую Google Protobuf для обработки сериализации и десериализации.



Я создал 2 исполняемых файла, которые в настоящее время называются "потребитель" и "производитель". Производитель отправляет сообщение через очередь сообщений потребителю, который, в свою очередь, десериализует его. Приведенный ниже код работает при передаче простых объектов типа " int " (что, на мой взгляд, означает, что очередь сообщений связь работает), но не работает при использовании данных из SerializeToOstream().

Как вы, возможно, заметили, я новичок в IPC и многопроцессорном программировании, но я считаю, что сделал свою домашнюю работу.

Здесь является моим продюсером.cpp:



#include <iostream>
#include <chrono>
#include <thread>
#include <fstream>

#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>

#include <boost/thread.hpp>

#include <internal/messages/testmessage.pb.h>

int main(int argc, char** argv) {
// Construct the object to be passed
GOOGLE_PROTOBUF_VERIFY_VERSION;

struct protoremove {
~protoremove(){ google::protobuf::ShutdownProtobufLibrary(); }
} remover;

ib::protobuf::testMessage myMessage;
myMessage.set_id(10);
myMessage.set_version(1);
std::cout << myMessage.DebugString() << std::endl;

// Initialize the Boost message queue
try{
//Open a message queue.
boost::interprocess::message_queue mq
(boost::interprocess::open_or_create
,"message_queue" //name
,100 //max message number
,1000 //max message size
);

// Send our message
std::ofstream buftosend;
myMessage.SerializeToOstream(&buftosend);
mq.send(&buftosend, sizeof(buftosend), 1);

}
catch(boost::interprocess::interprocess_exception &ex){
std::cout << ex.what() << std::endl;
return 1;
}

return 0;
}


Потребитель.cpp:



#include <iostream>
#include <fstream>

#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>

#include <boost/thread.hpp>

#include <internal/messages/testmessage.pb.h>

int main(int argc, char** argv) {
// Open the message queue
try {
//Erase previous message queue
boost::interprocess::message_queue::remove("message_queue");
ib::protobuf::testMessage recvdMessage;

//Create a message_queue.
boost::interprocess::message_queue mq
(boost::interprocess::open_or_create
,"message_queue" //name
,100 //max message number
,1000 //max message size
);

unsigned int priority;
boost::interprocess::message_queue::size_type recvd_size;

std::ifstream incomingbuf;
mq.receive(&incomingbuf, 1000, recvd_size, priority);

recvdMessage.ParseFromIstream(&incomingbuf);

recvdMessage.id();
recvdMessage.DebugString();
}
catch(boost::interprocess::interprocess_exception &ex){
boost::interprocess::message_queue::remove("message_queue");
std::cout << "IP error " << ex.what() << std::endl;
return 1;
}
boost::interprocess::message_queue::remove("message_queue");
return 0;

}


И определение сообщения (.прото):



package ib.protobuf;

message testMessage {
required int32 version = 1;
optional int64 id = 2;
optional string data = 3;
optional int64 sequencenumber = 4;
}


При запуске consumer он ожидает данных (mq.вызов receive() блокируется).
Когда производитель начинает, потребитель получает SIGSEGV. ГДБ указывает в своем обратном следе, что это происходит в строке 44, которая является методом ParseFromIstream ().
Производитель выводит правильные значения в DebugString ().



(gdb) r
Starting program: /home/roel/bin/consumer
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/usr/lib/libthread_db.so.1".

Program received signal SIGSEGV, Segmentation fault.
std::istream::sentry::sentry (this=0x7fffffffe117, __in=..., __noskip=true)
at /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc:50
50 /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc: No such file or directory.
(gdb) bt
#0 std::istream::sentry::sentry (this=0x7fffffffe117, __in=..., __noskip=true)
at /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc:50
#1 0x00007ffff679f7ab in std::istream::read (this=0x7fffffffe380,
__s=0x637d20 "", __n=8192)
at /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc:653
#2 0x00007ffff6b10030 in google::protobuf::io::IstreamInputStream::CopyingIstreamInputStream::Read(void*, int) () from /usr/lib/libprotobuf.so.9
#3 0x00007ffff6a99fe1 in google::protobuf::io::CopyingInputStreamAdaptor::Next(void const**, int*) () from /usr/lib/libprotobuf.so.9
#4 0x00007ffff6a97950 in google::protobuf::io::CodedInputStream::Refresh() ()
from /usr/lib/libprotobuf.so.9
#5 0x00007ffff6a94da3 in google::protobuf::MessageLite::ParseFromZeroCopyStream(google::protobuf::io::ZeroCopyInputStream*) () from /usr/lib/libprotobuf.so.9
#6 0x00007ffff6af5ad9 in google::protobuf::Message::ParseFromIstream(std::istream*) () from /usr/lib/libprotobuf.so.9
#7 0x0000000000407e35 in main (argc=1, argv=0x7fffffffe6d8)
at /home/roel/source/consumer.cpp:44
(gdb)


Это скомпилировано на Linux с использованием CMake и GCC 6.0.1.
У меня есть несколько вопросов о моей программе:





Q1. прежде всего; что может быть причиной ошибки сегментации?

Что я делаю не так? Я смотрел на этот код в течение многих часов, но не могу увидеть вопрос.





Q2. в boost:: interprocess:: message_queue
конструктор, я должен определить 2 параметра; максимальное число
сообщения, а также размер. Для стандартных типов этот размер равен
исправлено. Однако с сообщениями (в общем случае) размер сообщения
переменная величина. Итак, как лучше всего определить сумму
памяти, которая будет зарезервирована для сообщений?
должен ли я просто установить максимум
размер на сообщение и создать некоторые параметр многосоставного сообщения?





Q3. Есть ли лучший способ достичь моей цели? сериализация данных,
поставить его в очередь-это только кажется.. сложно, особенно
видеть это, вероятно, очень распространенная проблема. Должно быть что-то еще.
люди пытаются создать кросс-платформенный IPC. Библиотеки, такие как ZeroMQ
поддерживаются только доменные сокеты UNIX. Использование сокетов TCP для замыкания на себя
интерфейс просто кажется уродливым. Разве нет просто библиотеки, которая позволяет мне
помещать произвольные объекты (размер и расположение) в виде сообщений в общий каталог
сегмент памяти, который потребитель может затем pop()? Я имею в виду, в пределах ...
один поток, это может быть исправлено с помощью push() и pop() на стеке.
Выполнение всех этих дополнительных шагов кажется большим количеством накладных расходов.



Заранее благодарю вас за любой ответ.





Править



Как отмечает Dark, приведенный выше код использует экземпляр std::string вместо фактической строки (std::string.data())



Продюсер.СРР ответ ниже:



std::string str = myMessage.SerializeAsString();
mq.send(str.data(), str.size(), 1);


Однако это не работает как-есть для потребителя.cpp, так как строки инициализируются размером 0.

Вот код, который я использовал для потребителя.cpp:



unsigned int priority;
boost::interprocess::message_queue::size_type recvd_size;

//Reserve 1000 bytes of memory for our message
char incomingBuffer[1000];
mq.receive(&incomingBuffer, 1000, recvd_size, priority);

ib::protobuf::testMessage recvdMessage;

//Only if string object is really required
std::basic_string<char> str = incomingBuffer;
std::cout << "Message: " << str.data() << ". Size is " << recvd_size << std::endl;

//ParseFromString() can also directly parse "incomingBuffer", avoiding the cast above
recvdMessage.ParseFromString(str.data());

std::cout << "Message ID " << recvdMessage.id() << std::endl;
std::cout << recvdMessage.DebugString();
657   2  

2 ответов:

Эта часть продюсера кажется неправильной.

    // Send our message
    std::ofstream buftosend;
    myMessage.SerializeToOstream(&buftosend);
    mq.send(&buftosend, sizeof(buftosend), 1);

Файл ofstream не был открыт и поэтому не имеет файла для хранения чего-либо, поэтому первый вызов завершится неудачей (а не сбоем). Вызов send передает необработанную структуру класса ofstream через линию. Это не будет в передаваемом формате.

Я думаю, что вы хотите сериализовать в острингстрим, а затем передать содержимое острингстрима (а не весь объект).

Что-то вроде:

    // Send our message
    std::ostringstream buftosend;
    myMessage.SerializeToOstream(&buftosend);
    std::string str = buftosend.str();
    mq.send(str.data(), str.size(), 1); 

Или лучше и все же:

    // Send our message
    std::string str = myMessage.SerializeAsString();
    mq.send(str.data(), str.size(), 1); 

Вы также можете добавить строку отладки, чтобы показать, что такое содержимое str, хотя обратите внимание, что оно будет двоичным, поэтому не разборчиво.

У вас может возникнуть аналогичная проблема с вашим потребителем (ifstream должен быть открыт в файле).

A3:

ZeroMQ

Что касается ZeroMQ, то существует множество различных транспортных классов, которые могут использоваться одновременно. Поэтому, если кто-то хочет использовать самые низкие накладные расходы для локальной межпотоковой сигнализации, мы идем с inproc:// транспорт-класс, если идет локальный межпроцесс, то можно .bind() / .connect() Использование ipc:// транспорт-класс. Для межплатформенной распределенной обработки tcp:// или pgm:// или epgm:// транспорт-классы делают выбор легким по отношению к коммуникационным потребностям, согласованным с системой и сетевыми возможностями.

( Не стесняйтесь проверять другие сообщения, а также прямой URL-адрес книги Питера ХИНТЬЕНСА , которую необходимо прочитать для изучения проектирования распределенных систем )


nanomsg

Еще одна интеллектуальная и легкая платформа обмена сообщениями / сигнализации без брокера nanomsg происходит от Мартина СУСТРИКА, со-отца ZeroMQ. Опять же, INPROC, IPC, и еще TCP, транспортные классы готовы. Определенно стоит потратить несколько минут, чтобы прочитать его проницательные замечания на эту тему.


Однако использование этих фреймворков опирается, в силу разумного предположения, что агент обмена сообщениями не имеет понятия, какая система стоит на удаленном конце сокета обмена сообщениями / сигнализации, на некоторую представление объекта - будь то сериализация, контейнеризированный объект, объект-оболочка,-просто каждый несет ответственность за "подготовку" объекта своего желания к транспортировке как для отправки, так и для удаленного восстановления.

Использование обобщенного дизайна общей памяти, как было предложено, входит в другую архитектуру, тогда как ZeroMQ inproc:// и еще nanomsg INPROC транспортные классы-этонулевая копия почти-нулевая задержка примеры для это общая идея.

Comments

    Ничего не найдено.