Попытка асинхронного ввода-вывода с потоками Win32



Я пишу программу последовательного порта для Windows. Чтобы повысить производительность, я пытаюсь преобразовать подпрограммы в асинхронный ввод-вывод. у меня есть код, который работает довольно хорошо, но я новичок в этом, и я хотел бы улучшить производительность программы дальше. Во время стресс-тестов программы (т. е. пакетных данных в/из порта как можно быстрее при высокой скорости передачи) нагрузка на процессор становится довольно высокой.



Если у кого-то есть опыт асинхронного ввода-вывода и многопоточность в Windows, я был бы признателен, если бы вы могли взглянуть на мою программу. У меня есть две основные проблемы:





  • Правильно ли реализован асинхронный ввод-вывод? Я нашел довольно надежный источник в сети, предполагающий, что вы можете передавать пользовательские данные в функции обратного вызова, реализуя свою собственную перекрывающуюся структуру с вашими собственными данными в конце. Это, кажется, работает просто отлично, но это действительно выглядит немного "халтурно" для меня. Кроме того, производительность программы не улучшилась. многое, когда я преобразовал из синхронного / опрошенного в асинхронный / обратный вызов, заставляя меня подозревать, что я делаю что-то не так.



  • Разумно ли использовать STL std:: deque для буферов данных FIFO? Поскольку программа в настоящее время написана, я разрешаю получать только 1 байт данных за один раз, прежде чем он должен быть обработан. Поскольку я не знаю, сколько данных я получу, это может быть бесконечное количество. Я предполагаю, что этот 1-байт-за-раз приведет к вялому поведению за линиями deque, когда он имеет чтобы выделить данные. И я не доверяю Деку, чтобы быть потокобезопасным также (должен ли я?).
    Если использование STL deque не является разумным, есть ли какие-либо предложения по использованию лучшего типа данных? Статический массив на основе кругового кольцевого буфера?



Любые другие отзывы о коде также приветствуются.





Последовательные подпрограммы реализованы таким образом, что у меня есть родительский класс под названием "Comport", который обрабатывает все, что связано с последовательным вводом-выводом. От этого класса я унаследовал другой класс, называемый "ThreadedComport", который является многопоточным вариантом.



Класс ThreadedComport (соответствующие его части)



class ThreadedComport : public Comport
{
private:

HANDLE _hthread_port; /* thread handle */
HANDLE _hmutex_port; /* COM port access */
HANDLE _hmutex_send; /* send buffer access */
HANDLE _hmutex_rec; /* rec buffer access */

deque<uint8> _send_buf;
deque<uint8> _rec_buf;
uint16 _data_sent;
uint16 _data_received;

HANDLE _hevent_kill_thread;
HANDLE _hevent_open;
HANDLE _hevent_close;
HANDLE _hevent_write_done;
HANDLE _hevent_read_done;
HANDLE _hevent_ext_send; /* notifies external thread */
HANDLE _hevent_ext_receive; /* notifies external thread */

typedef struct
{
OVERLAPPED overlapped;
ThreadedComport* caller; /* add user data to struct */
} OVERLAPPED_overlap;

OVERLAPPED_overlap _send_overlapped;
OVERLAPPED_overlap _rec_overlapped;
uint8* _write_data;
uint8 _read_data;
DWORD _bytes_read;

static DWORD WINAPI _tranceiver_thread (LPVOID param);
void _send_data (void);
void _receive_data (void);
DWORD _wait_for_io (void);

static void WINAPI _send_callback (DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered,
LPOVERLAPPED lpOverlapped);
static void WINAPI _receive_callback (DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered,
LPOVERLAPPED lpOverlapped);

};


Основная процедура потока, созданная с помощью CreateThread():



DWORD WINAPI ThreadedComport::_tranceiver_thread (LPVOID param)
{
ThreadedComport* caller = (ThreadedComport*) param;

HANDLE handle_array [3] =
{
caller->_hevent_kill_thread, /* WAIT_OBJECT_0 */
caller->_hevent_open, /* WAIT_OBJECT_1 */
caller->_hevent_close /* WAIT_OBJECT_2 */
};

DWORD result;

do
{
/* wait for anything to happen */
result = WaitForMultipleObjects(3,
handle_array,
false, /* dont wait for all */
INFINITE);

if(result == WAIT_OBJECT_1 ) /* open? */
{
do /* while port is open, work */
{
caller->_send_data();
caller->_receive_data();
result = caller->_wait_for_io(); /* will wait for the same 3 as in handle_array above,
plus all read/write specific events */

} while (result != WAIT_OBJECT_0 && /* while not kill thread */
result != WAIT_OBJECT_2); /* while not close port */
}
else if(result == WAIT_OBJECT_2) /* close? */
{
; /* do nothing */
}

} while (result != WAIT_OBJECT_0); /* kill thread? */

return 0;
}


, который в свою очередь вызывает следующие три функции:



void ThreadedComport::_send_data (void)
{
uint32 send_buf_size;

if(_send_buf.size() != 0) // anything to send?
{
WaitForSingleObject(_hmutex_port, INFINITE);
if(_is_open) // double-check port
{
bool result;

WaitForSingleObject(_hmutex_send, INFINITE);
_data_sent = 0;
send_buf_size = _send_buf.size();
if(send_buf_size > (uint32)_MAX_MESSAGE_LENGTH)
{
send_buf_size = _MAX_MESSAGE_LENGTH;
}
_write_data = new uint8 [send_buf_size];


for(uint32 i=0; i<send_buf_size; i++)
{
_write_data[i] = _send_buf.front();
_send_buf.pop_front();
}
_send_buf.clear();
ReleaseMutex(_hmutex_send);


result = WriteFileEx (_hcom, // handle to output file
(void*)_write_data, // pointer to input buffer
send_buf_size, // number of bytes to write
(LPOVERLAPPED)&_send_overlapped, // pointer to async. i/o data
(LPOVERLAPPED_COMPLETION_ROUTINE )&_send_callback);

SleepEx(INFINITE, true); // Allow callback to come

if(result == false)
{
// error handling here
}

} // if(_is_open)
ReleaseMutex(_hmutex_port);
}
else /* nothing to send */
{
SetEvent(_hevent_write_done); // Skip write
}
}


void ThreadedComport::_receive_data (void)
{
WaitForSingleObject(_hmutex_port, INFINITE);

if(_is_open)
{
BOOL result;

_bytes_read = 0;
result = ReadFileEx (_hcom, // handle to output file
(void*)&_read_data, // pointer to input buffer
1, // number of bytes to read
(OVERLAPPED*)&_rec_overlapped, // pointer to async. i/o data
(LPOVERLAPPED_COMPLETION_ROUTINE )&_receive_callback);

SleepEx(INFINITE, true); // Allow callback to come

if(result == FALSE)
{
DWORD last_error = GetLastError();
if(last_error == ERROR_OPERATION_ABORTED) // disconnected ?
{
close(); // close the port
}
}
}

ReleaseMutex(_hmutex_port);
}



DWORD ThreadedComport::_wait_for_io (void)
{
DWORD result;
bool is_write_done = false;
bool is_read_done = false;

HANDLE handle_array [5] =
{
_hevent_kill_thread,
_hevent_open,
_hevent_close,
_hevent_write_done,
_hevent_read_done
};


do /* COM port message pump running until sending / receiving is done */
{
result = WaitForMultipleObjects(5,
handle_array,
false, /* dont wait for all */
INFINITE);

if(result <= WAIT_OBJECT_2)
{
break; /* abort */
}
else if(result == WAIT_OBJECT_3) /* write done */
{
is_write_done = true;
SetEvent(_hevent_ext_send);
}
else if(result == WAIT_OBJECT_4) /* read done */
{
is_read_done = true;

if(_bytes_read > 0)
{
uint32 errors = 0;

WaitForSingleObject(_hmutex_rec, INFINITE);
_rec_buf.push_back((uint8)_read_data);
_data_received += _bytes_read;

while((uint16)_rec_buf.size() > _MAX_MESSAGE_LENGTH)
{
_rec_buf.pop_front();
}

ReleaseMutex(_hmutex_rec);
_bytes_read = 0;

ClearCommError(_hcom, &errors, NULL);
SetEvent(_hevent_ext_receive);
}
}
} while(!is_write_done || !is_read_done);

return result;
}


Асинхронные функции обратного вызова ввода-вывода:



void WINAPI ThreadedComport::_send_callback (DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered,
LPOVERLAPPED lpOverlapped)
{
ThreadedComport* _this = ((OVERLAPPED_overlap*)lpOverlapped)->caller;

if(dwErrorCode == 0) // no errors
{
if(dwNumberOfBytesTransfered > 0)
{
_this->_data_sent = dwNumberOfBytesTransfered;
}
}


delete [] _this->_write_data; /* always clean this up */
SetEvent(lpOverlapped->hEvent);
}


void WINAPI ThreadedComport::_receive_callback (DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered,
LPOVERLAPPED lpOverlapped)
{
if(dwErrorCode == 0) // no errors
{
if(dwNumberOfBytesTransfered > 0)
{
ThreadedComport* _this = ((OVERLAPPED_overlap*)lpOverlapped)->caller;
_this->_bytes_read = dwNumberOfBytesTransfered;
}
}

SetEvent(lpOverlapped->hEvent);
}
722   3  

3 ответов:

Первый вопрос очень прост. Метод не является хакерским; вы владеете памятью OVERLAPPED и всем, что следует за ней. Это лучше всего описано Раймондом Ченом: http://blogs.msdn.com/b/oldnewthing/archive/2010/12/17/10106259.aspx

Вы можете ожидать улучшения производительности только в том случае, если у вас есть лучшие вещи, ожидая завершения ввода-вывода. Если все, что вы делаете, это SleepEx, вы увидите, что CPU% падает. Ключ к разгадке находится в названии "перекрыто" - оно позволяет перекрывать вычисления и ввод-вывод

std::deque<unsigned char> может обрабатывать данные FIFO без больших проблем. Он, вероятно, переработает 4KB кусков (точное число определяется обширным профилированием, все сделано для вас).

[править] Я немного углубился в ваш код, и мне кажется, что он излишне сложен. Для начала, одним из главных преимуществ асинхронного ввода-вывода является то, что вам не нужно все это потоковое оборудование. Потоки позволяют использовать больше ядер, но вы имеете дело с медленным устройством ввода-вывода. Даже одно ядро достаточно, Если он не тратит все свое время на ожидание. И это именно то, для чего перекрывается ввод-вывод. Вы просто выделяете один поток для всех операций ввода-вывода для порта. Поскольку это единственный поток, ему не нужен мьютекс для доступа к этому порту.

OTOH, вы хотели бы иметь мьютекс вокруг объектов deque<uint8>, так как потоки производителя/потребителя не совпадают с потоком comport.

Я не вижу никаких причин для использования асинхронного ввода-вывода в подобном проекте. Асинхронный ввод-вывод хорош, когда вы обрабатываете большое количество сокетов или работаете в ожидании данных, но, насколько я могу судить, Вы имеете дело только с одним сокетом и не делаете никакой работы между ними.

Кроме того, просто ради знания, вы обычно используете порт завершения ввода-вывода для обработки асинхронного ввода-вывода. я не уверен, есть ли какие-либо ситуации, когда использование завершения ввода-вывода порт оказывает негативное влияние на производительность.

Но да, ваше асинхронное использование ввода - вывода выглядит нормально. Реализация вашей собственной структуры OVERLAPPED действительно выглядит как хак, но это правильно; нет другого способа связать ваши собственные данные с завершением.

Boost также имеет реализацию циклического буфера, хотя я не уверен, что это потокобезопасно. Однако ни один из стандартных библиотечных контейнеров не является потокобезопасным.

Я думаю, что ваш код имеет неоптимальный дизайн.

  • Я думаю, что Вы делитесь слишком большим количеством структур данных со слишком большим количеством потоков. Я думаю, что вы должны поместить всю обработку последовательного устройства ввода-вывода для одного порта в один поток и поместить синхронизированную очередь команд/данных между потоком ввода-вывода и всеми клиентскими потоками. Пусть поток ввода-вывода следит за командами / данными в очереди.

  • Похоже, вы выделяете и освобождаете некоторые буферы для каждого отправленного события. Избегать тот. Если вы храните все операции ввода-вывода в одном потоке, вы можете повторно использовать один буфер. Вы ограничиваете размер сообщения в любом случае, вы можете просто предварительно выделить один достаточно большой буфер.

  • Размещение байтов, которые вы хотите отправить в std::deque, является неоптимальным. Вы должны сериализовать их в непрерывный блок памяти для WriteFile(). Вместо этого, если вы используете некоторую очередь commdand / data между одним потоком ввода-вывода и другими потоками, вы можете использовать клиентские потоки, обеспечивающие непрерывное кусок памяти сразу.

  • Чтение 1 байта за раз тоже кажется глупым. Если он не работает для последовательных устройств, вы можете предоставить достаточно большой буфер для ReadFileEx(). Он возвращает, сколько байт ему удалось прочитать на самом деле. Это не должно мешать, АФАИК, если, конечно, я не ошибаюсь.

  • Вы ждете, пока наложенный IO завершит использование вызова SleepEx(). В чем смысл перекрытого ИО, если вы просто заканчиваете быть синхронно?

Comments

    Ничего не найдено.