Подумайте, не сделать ли приложение событийно-управляемым (2)
| | |
Здесь будет продолжено обсуждение, начатое в совете 20, а также проиллюстрировано использование функции tselect в приложениях и рассмотрены некоторые другие аспекты событийно-управляемого программирования. Вернемся к архитектуре с двумя соединениями из совета 19.
Взглянув на программу xout2 (листинг 3.13), вы увидите, что она не управляется событиями. Отправив сообщение удаленному хосту, вы не возвращаетесь к чтению новых данных из стандартного ввода, пока не придет подтверждение Причина в том, что таймер может сбросить новое сообщение. Если бы вы взвели таймер для следующего сообщения, не дождавшись подтверждения, то никогда не узнали бы, подтверждено старое сообщение или нет.
Проблема, конечно, в том, что в программе xout2 только один таймер и поэтому она не может ждать более одного сообщения в каждый момент. Воспользовавшись t select, вы сможете получить несколько таймеров из одного, предоставляемого select.
Представьте, что внешняя система из совета 19 - это шлюз, отправляющий сообщение третьей системе по ненадежному протоколу. Например, он мог бы посылать датаграммы в радиорелейную сеть. Предположим, что сам шлюз не дает информации о том, было ли сообщение успешно доставлено. Он просто переправляет сообщение и возвращает подтверждение, полученное от третьей системы.
Чтобы в какой-то мере обеспечить надежность, новый писатель xout3 повторно посылает сообщение (но только один раз), если в течение определенного времени не получает подтверждения. Если и второе сообщение не подтверждено, xout3 протоколирует этот факт и отбрасывает сообщение. Чтобы ассоциировать подтверждение с сообщением, на которое оно поступило, xout 3 включает в каждое сообщение некий признак. Конечный получатель сообщения возвращает этот признак в составе подтверждения. Начнем с рассмотрения секции объявлений xout3 (листинг 3.18)
Листинг 3.18. Объявления для программы xout3
1 #define ACK 0x6 /* Символ подтверждения АСК. */
2 #define MRSZ 128 /* Максимальное число неподтвержденных сообщений.*/
3 #define T1 3000 /* Ждать 3 с до первого АСК */
4 #define T2 5000 /* и 5 с до второго АСК. */
5 #define ACKSZ ( sizeof ( u_int32_t ) + 1 )
6 typedef struct /* Пакет данных. */
7 {
8 u_int32_t len; /* Длина признака и данных. */
9 u_int32_t cookie; /* Признак сообщения. */
10 char buf[ 128 ]; /* Сообщение. */
11 } packet_t;
12 typedef struct /* Структура сообщения. */
13 {
14 packet_t pkt; /* Указатель на сохраненное сообщение.*/
15 int id; /* Идентификатор таймера. */
16 } msgrec_t;
17 static msgrec_t mr[ MRSZ ];
18 static SOCKET s;
Объявления
5 Признак, включаемый в каждое сообщение, — это 32- разрядный порядковый номер сообщения. Подтверждение от удаленного хоста определяется как ASCII-символ АСК, за которым следует признак подтверждаемого сообщения. Поэтому константа ASCZ вычисляется как длина признака плюс 1.
6-11 Тип packet_t определяет структуру посылаемого пакета. Поскольку сообщения могут быть переменной длины, в каждый пакет включена длина сообщения. Удаленное приложение может использовать это поле для разбиения потока данных на отдельные записи (об этом шла речь в совете 6). Поле len - это общая длина самого сообщения и признака. Проблемы, связанные с упаковкой структур, рассматриваются в замечаниях после листинга 2.15.
12-16 Структура msgrec_t содержит структуру packet_t, посланную удаленному хосту. Пакет сохраняется на случай, если придется послать его повторно. Поле id - это идентификатор таймера, выступающего в роли таймера ретрансмиссии для этого сообщения.
17 С каждым неподтвержденным сообщением связана структура msgrec_t. Все они хранятся в массиве mr.
Теперь обратимся к функции main программы xout3 (листинг 3.19).
Листинг 3.19. Функция main программы xout3
1 int main( int argc, char **argv )
2 {
3 fd_set allreads;
4 fd_set readmask;
5 msgrec_t *mp;
6 int rc;
7 int mid;
8 int cnt = 0;
9 u_int32_t msgid = 0;
10 char ack[ ACKSZ ];
11 INIT();
12 s = tcp_client( argv[ 1 ], argv[ 2 ] );
13 FD_ZERO( &allreads );
14 FD_SET( s, &allreads );
15 FD_SET( 0, &allreads );
16 for ( mp = mr; mp < mr + MRSZ; mp++ )
17 mp->pkt.len = -1;
18 for ( ; ; )
19 {
20 readmask = allreads;
21 rc-= tselectf s + 1, &readmask, NULL, NULL );
22 if ( rc < 0 )
23 error( 1, errno, "ошибка вызова tselect" );
24 if ( rc == 0 )
25 error( 1, 0, "tselect сказала, что нет событий\n")
26 if ( FD_ISSET( s, &readmask ) )
27 {
28 rc = recv( s, ack + cnt, ACKSZ - cnt, 0 );
29 if ( rc == 0 )
30 error( 1, 0, "сервер отсоединился\n");
31 else if ( rc < 0 )
32 error( 1, errno, "ошибка вызова recv" );
33 if ( ( cnt += rc ) < ACKSZ ) /* Целое сообщение? */
34 continue; /* Нет, еще подождем. */
35 cnt =0; /* В следующий раз новое сообщение. */
36 if ( ack[ 0 ] != ACK)
37 {
38 error ( 0,0," предупреждение: неверное подтверждение\n");
39 continue;
40 }
41 memcpy( &mid, ack + 1, sizeof( u_int32_t ) );
42 mp = findmsgrec( mid );
43 if ( mp != NULL)
44 {
45 untimeout( mp->id ); /* Отменить таймер.*/
46 freemsgrecf mp ); /* Удалить сохраненное сообщение. */
47 }
48 }
49 if ( FD_ISSET( 0, &readmask ) )
50 {
51 mp = getfreerec ();
52 rc = read( 0, mp->pkt.buf, sizeoft mp->pkt.buf )
53 if ( rc < 0 )
54 error( 1, errno, "ошибка вызова read" );
55 mp->pkt.buf[ rc ] = '\0';
56 mp->pkt.cookie = msgid++;
57 mp->pkt.len = htonl( sizeof( u_int32_t ) + rc );
58 if ( send( s, &mp->pkt,
59 2 * sizeof( u_int32_t ) + rc, 0 ) < 0 )
60 error( 1, errno, "ошибка вызова send" );
61 mp->id = timeout( ( tofunc_t )lost_ACK, mp, Tl );
62 }
63 }
64 }
Инициализация
11-15 Так же, как и в программе xout2, соединяемся с удаленным хостои и инициализируем маски событий для tselect, устанавливая в них биты для дескрипторов stdin и сокета, который возвратилa tcp_client
16-17 Помечаем все структуры msgrec_t как свободные, записывая в поле длины пакета
18-25 Вызываем tselect точно так же, как select, только не передаем последний параметр (времени ожидания). Если tselect возвращает ошибку или нуль, то выводим диагностическое сообщение и завершаем программу. В отличие от select возврат нуля из tselect - свидетельство ошибки, так как все тайм-ауты обрабатываются внутри.
Обработка входных данных из сокета
26-32 При получении события чтения из сокета ожидаем подтверждение. В совете 6 говорилось о том, что нельзя применить recv в считывании ASCZ байт, поскольку, возможно, пришли еще не все данные. Нельзя воспользоваться и функцией типа readn, которая не возвращает управления до получения указанного числа байт, так как это противоречило бы событийно-управляемой архитектуре приложения, - ни одно событие не может быть обработано, пока readn не вернет управления. Поэтому пытаемся прочесть столько данных, сколько необходимо для завершения обработки текущего подтверждения. В переменной cnt хранится число ранее прочитанных байт, поэтому ASCZ - cnt - это число недостающих байт.
33-35 Если общее число прочитанных байт меньше ASCZ, то возвращаемся к началу цикла и назначаем tselect ожидание прихода следующей партии данных или иного события. Если после только что сделанного вызова recv подтверждение получено, то сбрасываем cnt в нуль в ожидании следующего подтверждения (к этому моменту не было прочитано еще ни одного байта следующего подтверждения).
36-40 Далее, в соответствии с советом 11, выполняем проверку правильности полученных данных. Если сообщение - некорректное подтверждение, печатаем диагностическое сообщение и продолжаем работу. Возможно, здесь было бы правильнее завершить программу, так как удаленный хост послал неожиданные данные.
41- 42 Наконец, извлекаем из подтверждения признак сообщения, вызываем findmsgrec для получения указателя на структуру msgrec_t, ассоциированную с сообщением, и используем ее для отмены таймера, после чего освобождаем msgrec_t. Функции findmsgrec и freemsgrec приведены в листинге 3.20.
Обработка данных из стандартного ввода
51-57 Когда tselect сообщает о событии ввода из stdin, получаем структуру msgrec_t и считываем сообщение в пакет данных. Присваиваем сообщению порядковый номер, пользуясь счетчиком msgid, и сохраняем его в поле cookie пакета. Обратите внимание, что вызывать htonl не нужно, так как удаленный хост не анализирует признак, а возвращает его без изменения. Записываем в поля пакета полную длину сообщения вместе с признаком. На этот раз вызываем htonl, так как удаленный хост использует это поле для чтения оставшейся части сообщения (совет 28).
55-61 Посылаем подготовленный пакет удаленному хосту и взводим таймер ретрансмиссии, обращаясь к функции timeout.
Оставшиеся функции программы xout3 приведены в листинге 3.20.
Листинг 3.20. Вспомогательные функции программы xout3
1 msgrec_t *getfreerec( void )
2 {
3 msgrec_t *mp;
4 for ( mp = mr; mp < mr + MRSZ; mp++ )
5 if ( mp->pkt.len == -1 ) /* Запись свободна? */
6 return mp;
7 error(1,0, "getfreerec: исчерпан пул записей сообщений\n" );
8 return NULL; /* "Во избежание предупреждений компилятора.*/
9 }
10 msgrec_t *findmsgrec( u_int32_t mid )
11 {
12 msgrec_t *mp;
13 for ( mp = mr; mp < mr + MRSZ; mp++ )
14 if ( mp->pkt.len != -1 && mp->pkt.cookie == mid )
15 return mp;
16 error (0, 0,"findmsgrec: нет сообщения, соответствующего ACK %d\n", mid);
17 return NULL;
18 }
19 void freemsgrec( msgrec_t *mp )
20 {
21 if ( mp->pkt.len == -1 )
22 error(1,0, "freemsgrec: запись сообщения уже освобождена\n" };
23 mp->pkt.len = -1;
24 }
25 gtatic void drop( msgrec_t *mp )
26 {
27 error( 0, 0, "Сообщение отбрасывается: %s", mp->pkt.buf );
28 freemsgrec( mp );
29 }
30 static void lost_ACK( msgrec_t *mp )
31 {
32 error( 0, 0, "Повтор сообщения: %s", mp->pkt.buf );
33 if ( send( s, &mp->pkt,
34 sizeof( u_int32_t ) + ntohl( mp->pkt.len ), 0 ) < 0 )
35 error ( 1, errno, " потерян АСК: ошибка вызова send" );
36 mp->id = timeout) ( tofunc_t )drop, mp, T2 );
37 }
getfreerec
1-9 Данная функция ищет свободную запись в таблице mr. Просматриваем последовательно весь массив, пока не найдем пакет с длиной -1. Это означает, что запись свободна. Если бы массив mr был больше, то можно было бы завести список свободных, как было сделано для записей типа tevent_t в листинге 3.15.
findmsgrec
10-18 Эта функция почти идентичная get f reerec, только на этот раз ищем запись с заданным признаком сообщения.
freemsgrec
19-24 Убедившись, что данная запись занята, устанавливаем длину пакета в -1, помечая тем самым, что теперь она свободна.
drop
25-29 Данная функция вызывается, если не пришло подтверждение на второе посланное сообщение (см. lost_ACK). Пишем в протокол диагностику и отбрасываем запись, вызывая freemsgrec.
lost_ACK
30-37 Эта функция вызывается, если не пришло подтверждение на первое сообщение. Посылаем сообщение повторно и взводим новый таймер ре-трансмиссии, указывая, что при его срабатывании надо вызвать функцию drop.
Для тестирования xout3 напишем серверное приложение, которое случайным образом отбрасывает сообщения. Назовем этот сервер extsys (сокращение от external system - внешняя система). Его текст приведен в листинге 3.21.
Листинг 3.21. Внешняя система
extsys.c
1 #include "etcp.h"
2 #define COOKIESZ 4 /* Так установлено клиентом. */
3 int main ( int argc, char **argv )
4 {
5 SOCKET s;
6 SOCKET s1;
7 int rc;
8 char buf[ 128 ] ;
9 INIT();
10 s = tcp_server( NULL, argv[ 1 ] );
11 s1 = accept( s, NULL, NULL );
12 if ( !isvalidsock) s1 ) )
13 error( 1, errno, "ошибка вызова accept" );
!4 srand( 127 );
15 for ( ;; )
16 {
17 rc = readvrec( s1, buf, sizeof( buf ) );
18 if ( rc == 0 )
19 error( 1, 0, "клиент отсоединился\n" );
20 if ( rc < 0 )
21 error( 1, errno, "ошибка вызова recv" );
22 if ( rand() % 100 < 33 )
23 continue;
24 write! 1, buf + COOKIESZ, rc - COOKIESZ );
25 memmove( buf + 1, buf, COOKIESZ );
26 buf[ 0 ] = ' \006';
27 if ( send( s1, buf, 1 + COOKIESZ, 0 ) < 0 )
28 error( 1, errno, "ошибка вызова send" );
29 }
30 }
Инициализация
9- 14 Выполняем обычную инициализацию сервера и вызываем функцию srand для инициализации генератора случайных чисел.
Премечание: Функция rand из стандартной библиотеки С работает быстрои проста в применении, но имеет ряд нежелательных свойств. Хотя для демонстрации xout3 она вполне пригодна, но для серьезного моделирования нужно было бы воспользоваться более развитым генератором случайных чисел [Knuth 1998].
17-21 Вызываем функцию readvrec для чтения записи переменной длины, посланной xout3.
22-23 Случайным образом отбрасываем примерно треть получаемых сообщений.
24-28 Если сообщение не отброшено, то выводим его на stdout, сдвигаем в буфере признак на один символ вправо, добавляем в начало символ АСК и возвращаем подтверждение клиенту.
Вы тестировали xout3, запустив extsys в одном окне и воспользовавшись конвейером из совета 20 в другом (рис. 3.7).
Можно сделать следующие замечания по поводу работы xout3:
bsd $ mp I xout3 localhost 9000 xout3: Повтор сообщения: message 3 xout3: Повтор сообщения: message 4 xout3: Повтор сообщения: message 5 xoutS: Сообщение отбрасывается: message 4 xout3: Сообщение отбрасывается: message 5 xout3: Повтор сообщения: message 11 xout3: Повтор сообщения: message 14 xout3: Сообщение отбрасывается: message 11 xout3: Повтор сообщения: message 16 xout3: Повтор сообщения: message 17 xout3: Сообщение отбрасывается: message 14 xout3: Повтор сообщения: message 19 xout3: Повтор сообщения: message 20 xout3: Сообщение отбрасывается: message 16 xout3: Сервер отсоединился Broken pipe bsd $ |
bsd $ extsys 9000 message 1 message 2 message 3 message 6 message 7 message 8 message 9 message 10 message 12 message 13 message 15 message 18 message 17 message 21 message 20 message 23 ^C сервер остановлен bsd $ |
Рис. 3.7. Демонстрация xout 3