учебники, программирование, основы, введение в,
Домашние Фото, частное фото голых девушек.

 

Средства межпроцессного взаимодействия в реальном времени

Передача и прием сообщений в реальном времени
Средства локальной передачи и приема сообщений присутствуют, вероятно, во всех или почти во всех операционных системах. Во многих версиях ОС Unix представлено по нескольку разновидностей подобных средств. Следуя за исторически сложившимися реализациями, стандарт POSIX-2001 предусматривает два вида очередей сообщений и обслуживающих их функций. Один из них был рассмотрен нами в курсе [1]. Здесь мы опишем вторую разновидность, предназначенную для использования в системах реального времени.
Описываемые очереди сообщений являются общесистемными. Они доступны по именам, которые могут быть маршрутными именами файлов.
Над очередями сообщений определены следующие группы функций:

  • открытие очереди;
  • отправка сообщения в очередь;
  • прием (синхронный или асинхронный) сообщения из очереди;
  • изменение атрибутов очереди;
  • регистрация на получение уведомления о появлении сообщения в очереди;
  • закрытие очереди;
  • удаление очереди.

Одну очередь могут открыть несколько посылающих и/или принимающих сообщения процессов. При открытии может производиться контроль прав доступа.
Для каждой очереди задается фиксированная верхняя граница размера сообщений, которые могут быть в эту очередь отправлены.
На порядок приема влияет имеющийся механизм приоритетов сообщений.
Процесс может получать асинхронные уведомления о том, что в очереди появилось сообщение.
Для открытия очереди служит функция mq_open() (см., которая, по аналогии с файлами, создает описание открытой очереди и ссылающийся на него дескриптор типа mqd_t, возвращаемый в качестве нормального результата.
#include <mqueue.h>
mqd_t mq_open (
const char *name, int oflag, ...);
Трактовка имени очереди (аргумент name) зависит от реализации. Оговаривается только, что оно должна подчиняться ограничениям, налагаемым на маршрутные имена, и что если его первым символом является '/', то процессы, вызывающие mq_open() с одинаковыми значениями name, ссылаются на одну и ту же очередь (если, конечно, ее не удаляли).
При реализации дескрипторов очередей сообщений могут применяться файловые дескрипторы. В таком случае приложение может открыть не менее OPEN_MAX файлов и очередей.
Аргумент oflag специфицирует запрашиваемые виды доступа к очереди: на прием (чтение) и отправку (запись). Контроль прав доступа для очередей сообщений производится так же, как и для файлов.
В целом набор флагов и их трактовка для очередей сообщений те же, что и для файлов: O_RDONLY, O_WRONLY, O_RDWR, O_CREAT, O_EXCL, O_NONBLOCK.
Если установлен флаг O_CREAT, то при вызове функции mq_open() необходимо задать два дополнительных аргумента: режим доступа (тип mode_t) и указатель на атрибутный объект (тип struct mq_attr *) создаваемой очереди сообщений.
Согласно стандарту POSIX-2001, структура типа mq_attr, описанная в заголовочном файле <mqueue.h>, содержит по крайней мере следующие поля.
long mq_flags; 
/* Флаги очереди сообщений          */
long mq_maxmsg;
/* Максимальное число сообщений
в очереди  */
long mq_msgsize;
/* Максимальный размер сообщения
в очереди */
long mq_curmsgs;
/* Текущее число сообщений в очереди */
Для опроса и/или установки атрибутов очереди служат функции mq_getattr() и mq_setattr() (см.). Впрочем, про установку атрибутов сказано, пожалуй слишком сильно: посредством вызова 2 можно изменить лишь состояние флага 2 (и, возможно, некоторых других флагов, зависящих от реализации).
#include <mqueue.h>

int mq_getattr (
mqd_t mqdes, struct mq_attr *mqstat);

int mq_setattr (mqd_t mqdes,
const struct mq_attr *restrict mqstat,
struct mq_attr *restrict omqstat);
После того, как процесс завершил работу с очередью сообщений, соответствующий дескриптор следует закрыть, воспользовавшись функцией mq_close() (
  
#include <mqueue.h>
int mq_close (mqd_t mqdes);
Если очередь сообщений стала совсем ненужной, ее можно удалить с помощью функции mq_unlink() (см.
  
#include <mqueue.h>
int mq_unlink (const char *name);


Переходя к описанию содержательных действий с очередями сообщений, укажем, что отправка осуществляется функциями mq_send() и mq_timedsend()
  
#include <mqueue.h>
int mq_send (mqd_t mqdes,
const char *msg_ptr,
size_t msg_len,
unsigned msg_prio);
  
#include <mqueue.h>
#include <time.h>
int mq_timedsend (
mqd_t mqdes, const char *msg_ptr,
size_t msg_len, unsigned msg_prio,
const struct timespec *abstime);
Более точно: функции mq_send() и mq_timedsend() помещают сообщение из msg_len байт, на которое указывает аргумент msg_ptr, в очередь, заданную дескриптором mqdes (если она не полна), в соответствии с приоритетом msg_prio (большим значениям msg_prio соответствует более высокий приоритет сообщения; допустимый диапазон - от 0 до MQ_PRIO_MAX - 1).
Если очередь полна, а флаг O_NONBLOCK не установлен, вызов mq_send() блокируется до появления свободного места. Функция mq_timedsend() в таких случаях контролирует длительность ожидания (до заданного аргументом abstime абсолютного момента времени по часам CLOCK_REALTIME).
Для извлечения (разумеется, с удалением) сообщений из очереди служат функции mq_receive() и mq_timedreceive() (смИзвлекается самое старое из сообщений с самым высоким приоритетом и помещается в буфер, на который указывает аргумент msg_ptr. Если размер буфера (значение аргумента msg_len) меньше атрибута очереди mq_msgsize, вызов завершается неудачей. Если значение msg_prio_ptr отлично от NULL, в указуемый объект помещается приоритет принятого сообщения.
#include <mqueue.h>
ssize_t mq_receive (
mqd_t mqdes, char *msg_ptr,
size_t msg_len, unsigned *msg_prio_ptr);
#include <mqueue.h>
#include <time.h>
ssize_t mq_timedreceive (
mqd_t mqdes, char *restrict msg_ptr,
size_t msg_len,
unsigned *restrict msg_prio_ptr,
const struct timespec *restrict abstime);

Если очередь пуста, а флаг O_NONBLOCK не установлен, вызов mq_receive() блокируется до появления сообщения. Функция mq_timedreceive() в таких случаях контролирует длительность ожидания.
Нормальным результатом обеих функций является размер в байтах извлеченного из очереди сообщения. Как всегда, признаком ошибки служит -1.
Посредством функции mq_notify() (см. процесс может зарегистрироваться на получение уведомления о том, что в очередь, бывшую до этого пустой, поступило сообщение.
  
#include <mqueue.h>
int mq_notify (mqd_t mqdes,
const struct sigevent *notification);
Для уведомлений используется механизм сигналов реального времени. В каждый момент времени только один процесс может быть зарегистрированным, а сама регистрация является одноразовой: после поступления уведомления она отменяется. Процесс может добровольно отменить регистрацию, если передаст NULL в качестве значения аргумента notification.
Если, наряду с зарегистрированным процессом, имеется поток управления, ожидающий сообщения в вызове mq_receive() или mq_timedreceive(), поступившее сообщение достанется потоку, а процесс не получит никакого уведомления, как если бы очередь осталась пустой. Это очень по-человечески: живое стояние в очереди всегда ценилось выше всяких списков и уведомлений.
На наш взгляд, возможность получать уведомления о том, что очередь сообщений стала непустой, является скорее заплатой, призванной скрыть неполноту функциональности poll() и select(), чем полноценным, практически полезным средством. В стандартизованном интерфейсе отсутствует концептуальная целостность - из соображений симметрии необходимы уведомления о том, что очередь стала неполной и в нее можно отправлять сообщения. Далее, состояние очереди может меняться параллельно с регистрацией и/или получением уведомления, и нет никаких средств, чтобы сделать соответствующую транзакцию атомарной. В результате остается неясным, когда процесс получит уведомление (и получит ли он его вообще), какое сообщение он примет после получения уведомления (и будет ли что принимать) и т.п., хотя, с другой стороны, устраивать конкуренцию за сообщения тоже не обязательно.
Поучительно сопоставить два вида очередей сообщений - только что описанные и те, что были представлены в курсе . Сразу видно, что первые устроены проще и, следовательно, могут быть реализованы эффективнее. Во-первых, структура типа mq_attr гораздо компактнее, чем msqid_ds. Нет и речи о хранении времени последних операций и идентификаторов процессов, их выполнивших. Во-вторых, упрощены производимые проверки. Контролируется максимальный размер одного сообщения, а не суммарный размер сообщений в очереди. Права доступа проверяются при открытии, а не при каждой операции. Только наличие приоритетов вносит элемент сложности, сопоставимый с обработкой типов сообщений, но приоритеты так или иначе должны быть реализованы - либо на уровне ОС, либо в приложении, поэтому лучше этот аспект стандартизовать и поручить операционной системе.
Примером применения очередей сообщений послужит программа, вычисляющая взвешенную сумму целых чисел, генерируемых и отправляемых множеством потоков управления (см.
Обратим внимание на то, что если в программе для отправки и приема сообщений используются буфера одного размера, он должен равняться значению атрибута mq_msgsize, которое задается при создании очереди. Отметим также применение режима без блокировки, что важно для приложений реального времени.

Семафоры реального времени
Семафор реального времени - это эффективный механизм синхронизации процессов, представляющий собой общесистемный разделяемый ресурс, имеющий неотрицательное целочисленное значение.
Основными операциями над семафором являются захват и освобождение. Если делается попытка захвата семафора, когда его значение равно нулю, выполнение вызывающего потока управления приостанавливается и он добавляется к множеству потоков, ждущих на семафоре; в противном случае значение уменьшается.
Если при освобождении семафора множество ждущих потоков было непусто, один из них удаляется из этого множества и его выполнение возобновляется; в противном случае значение семафора просто увеличивается.
Семафоры бывают именованными и безымянными (неименованными). Первые именуются цепочками символов и создаются функцией sem_open() с флагом O_CREAT, вторые создаются функцией sem_init(). При прочих операциях семафор идентифицируется открытым дескриптором (который может быть унаследован у родительского процесса, вызвавшего fork()). Дескриптор реализуется как указатель на объект типа sem_t.
Перед выполнением операций семафор необходимо инициализировать, задав неотрицательное значение. Отрицательные значения (точнее, их абсолютная величина) могут использоваться реализацией для указания числа ждущих потоков управления.
Семафор сохраняет свое состояние после закрытия последней ссылки на него, то есть если позднее он будет вновь открыт, его значение окажется тем же, что и перед закрытием.
Детальное описание функций, обслуживающих семафоры, мы начнем, разумеется, с sem_open() и sem_init() (см., которые обеспечивают открытие, создание и инициализацию.
#include <semaphore.h>

sem_t *sem_open (const char *name, int oflag, ...);

int sem_init (sem_t *sem, int pshared, unsigned value);
Имена семафоров (аргумент name функции sem_open()) устроены и трактуются так же, как и описанные выше имена очередей сообщений. Флагов (в аргументе oflag) может быть установлено два: O_CREAT и/или O_EXCL. Если установлен флаг O_CREAT, то при вызове функции sem_open() необходимо задать два дополнительных аргумента: режим доступа (тип mode_t) и значение семафора (тип unsigned int).
В случае ошибки функция sem_open() возвращает значение SEM_FAILED, отличное от любого допустимого указателя на объект типа sem_t, а sem_init() "по старинке" возвращает -1.
Отметим, что нормальный результат для функции sem_init() в POSIX-2001 не стандартизован; вероятно, в будущих версиях им станет нуль. Инициализированный объект типа sem_t помещается по указателю sem. Аргумент value задает начальное значение создаваемого неименованного семафора. Если аргумент pshared отличен от нуля, семафор разделяется между процессами; в противном случае разделение возможно только между потоками управления вызывающего процесса.
Именованный семафор можно закрыть, обратившись к функции sem_close(), и удалить с помощью функции sem_unlink(); для ликвидации неименованных семафоров служит функция sem_destroy() (см). Нормальный результат этих функций равен нулю, в случае ошибки возвращается -1.
  
#include <semaphore.h>

int sem_close (sem_t *sem);

int sem_unlink (const char *name);

int sem_destroy (sem_t *sem);
Листинг 4.12. Описание функций закрытия и ликвидации семафоров.
Отметим, что эффект от вызова sem_close() для неименованного семафора, равно как и последствия вызова sem_destroy() для семафора именованного, не определены. Также не определен результат применения функции sem_destroy() к семафору, на котором имеются ждущие потоки управления.
Для захвата семафоров служат функции sem_wait(), sem_trywait() и sem_timedwait()
  
#include <semaphore.h>

int sem_wait (sem_t *sem);

int sem_trywait (sem_t *sem);
Листинг 4.13. Описание функций захвата семафоров. (  
#include <semaphore.h>
#include <time.h>
int sem_timedwait (sem_t *restrict sem,
const struct timespec *restrict abstime);
Листинг 4.14. Описание функции захвата семафора с контролем времени ожидания.
Если значение семафора было положительным, все три перечисленные функции без каких-либо задержек завершаются успехом, уменьшая это значение до нуля и возвращая нулевой результат. В противном случае вызов sem_trywait() завершается неудачей, вызов sem_wait() блокируется до освобождения семафора, вызов sem_timedwait() также блокируется, но с контролем времени ожидания.
Освобождение семафора осуществляется функцией sem_post() (). Для возобновления выполнения (если таковое имеет место) выбирается наиболее приоритетный поток управления, а среди потоков с равными приоритетами - тот, что ждал дольше других.
  
#include <semaphore.h>
int sem_post (sem_t *sem);
Листинг 4.15. Описание функции sem_post().Функция sem_getvalue()) позволяет опросить значение семафора, не меняя его состояния.
  
#include <semaphore.h>
int sem_getvalue (sem_t *restrict sem, int *restrict sval);
Листинг 4.16. Описание функции sem_getvalue().Значение семафора записывается по указателю sval. Если семафор был захвачен, это значение окажется нулевым или отрицательным.
Если сопоставить семафоры реального времени с теми, что были описаны в курсе [1], то можно сделать те же выводы, что и для очередей сообщений. Семафоры реального времени, согласно стандарту POSIX-2001, по сути являются бинарными. Они устроены проще и, следовательно, могут быть реализованы эффективнее. Главное - отсутствуют тяжеловесные, со сложной семантикой, трудные для реализации групповые операции.
Использование семафоров реального времени проиллюстрируем программой, реализующей взаимодействие поставщик/потребитель).
Листинг 4.17. Пример программы, реализующей взаимодействие поставщик/потребитель. Применение неименованных семафоров в данном случае представляется вполне естественным. Поставщик захватывает семафор записи, заполняет буфер и освобождает семафор, разрешающий чтение. Потребитель действует симметричным образом.
Отметим, что семафоры ликвидируются после терминирования использующих их потоков управления, что, согласно стандарту POSIX-2001, является безопасным.
Поясним и обсудим сделанное выше замечание о том, что, согласно стандарту POSIX-2001, семафоры реального времени по сути являются бинарными. Такой вывод можно сделать по двум фразам из описания функции sem_trywait(). Во-первых, функция sem_trywait() захватывает семафор только в том случае, если он еще не захвачен, то есть если значение семафора положительно. Во-вторых, в случае успешного завершения семафор захватывается и должен оставаться в этом состоянии, пока не будет успешно выполнена функция sem_post().
Здесь ничего не говорится об уменьшении положительного значения без захвата семафора, зато намеки на это имеются в части, где приводятся детальное разъяснение положений стандарта и обоснование принятых решений. Смущает и то, что в описании функции sem_post() не запрещается освобождать свободный семафор, что, естественно, выливается в увеличение его значения. Далее, в разных местах семафоры называются то бинарными, то целочисленными. Наконец, автору не известно ни одной реализации, где семафоры реального времени не были бы целочисленными. В общем, сложное это дело - толкование внутренне противоречивых стандартов, вступающих в конфликт с реальной жизнью...
Если считать, что рассматриваемые семафоры являются целочисленными, приведенную выше программу можно усовершенствовать, сделав буфер кольцевым, с размером, большим единицы (см.
Листинг 4.18. Пример программы, реализующей взаимодействие поставщик/потребитель с помощью целочисленных семафоров. Значение семафора w_sem равно числу элементов буфера, доступных для записи, r_sem - для чтения. Многопотоковым инвариантом программы является сумма этих величин, равная размеру буфера. В начальный момент буфер целиком доступен для записи. После этого вызовы sem_wait() и sem_post() уменьшают значение "своего" и увеличивают значение "чужого" семафора. Поток управления приостанавливается в sem_wait(), когда "свое" значение уменьшается до нуля, у нужно ждать, пока другой поток вызовом sem_post() не увеличит его, сделав положительным.
Разумеется, обсуждение темы семафоров было бы неполным без обеда философов. Мы приведем программу, написанную С.В. Самборским (см). В ней семафоры используются как бинарные, поэтому ее стандартность и мобильность не вызывают сомнений.
Листинг 4.19. Многопотоковый вариант решения задачи об обедающих философах с использованием семафоров реального времени.

Разделяемые сегменты памяти
Разделяемые сегменты памяти, называемые далее объектами в разделяемой памяти, - самый эффективный способ передачи данных между процессами. Одной операцией записи можно передать данные сразу многим процессам, разделяющим тот же объект.
Объекты в разделяемой памяти полезны как для систем, поддерживающих виртуальную память и раздельные адресные пространства для процессов, так и для встроенных систем с минимальной аппаратной поддержкой.
Минимальный мобильный программный интерфейс к объектам в разделяемой памяти включает функции открытия (возможно, с созданием) подобного объекта и получения его дескриптора, а также удаления ранее созданного объекта.
Стандартный программный интерфейс состоит из двух функций: shm_open() и shm_unlink() (см. #include <sys/mman.h>

int shm_open (const char *name,
int oflag, mode_t mode);

int shm_unlink (const char *name);
Листинг 4.20. Описание функций shm_open() и shm_unlink().При открытии с помощью функции shm_open() возвращается файловый дескриптор. Имя (аргумент name) трактуется стандартным для рассматриваемых средств межпроцессного взаимодействия образом. Посредством аргумента oflag могут указываться флаги O_RDONLY, O_RDWR, O_CREAT, O_EXCL и/или O_TRUNC. Если объект создается, то режим доступа к нему формируется в соответствии со значением аргумента mode и маской создания файлов процесса.
После создания объект в разделяемой памяти существует, пока не будет удален функцией shm_unlink(). Он сохраняет свое состояние после закрытия всех ссылающихся на него дескрипторов, однако эффект от перезагрузки системы стандарт POSIX-2001 не специфицирует.
Представляется естественным, что способ доступа к объектам определяется типом дескриптора, возвращаемого при их открытии. Если это адрес, то доступ сводится к операциям чтения/записи из/в память. Если это файловый дескриптор, то для доступа должны использоваться функции файлового ввода/вывода - read(), write() и т.п. Подобное естественное применение объектов в разделяемой памяти иллюстрируется двухпроцессной программой, копирующей строки со стандартного ввода на стандартный вывод (. Предполагается, что заголовочный файл называется "g_shm.h", а файл с образом процесса, запускаемого посредством execl(), - "g_r_shm".
#ifndef g_SHM
#define g_SHM

/* Имя объекта в разделяемой памяти */
#define O_SHM_NAME        "/g_o.shm"

/* Используемый номер сигнала
реального времени */
#define SIG_SHM   SIGRTMIN

/* Используемые значения сигнала
реального времени */
#define SIGVAL_LINE         0
#define SIGVAL_EOF          EOF

#endif
Листинг 4.21. Заголовочный файл "g_shm.h" программы, копирующей строки со стандартного ввода на стандартный вывод. (
Листинг 4.22. Исходный текст начального процесса двухпроцессной программы, копирующей строки со стандартного ввода на стандартный вывод. (
Листинг 4.23. Исходный текст порождаемого процесса (файл g_r_shm.c) двухпроцессной программы, копирующей строки со стандартного ввода на стандартный вывод. (
Объект в разделяемой памяти используется в приведенной программе как буфер на один элемент (одну строку). Чтение и запись производятся с начала объекта, для чего применяется файловое позиционирование. Разумеется, в такой ситуации порожденный процесс не может обычным образом обнаружить конец передаваемого ему файла, поэтому приходится применять дополнительные средства. В данном случае это сигналы реального времени, которые несут двойную нагрузку - обеспечивают синхронизацию доступа к объекту в разделяемой памяти и передают от родительского процесса порожденному информацию о конце файла.
К сожалению, стандарт POSIX-2001 не следует приведенным выше естественным предположениям, касающимся связи между типом дескриптора и способом доступа к объекту, так что приведенная программа, строго говоря, не соответствует стандарту. Дело в том, что файловый дескриптор, возвращаемый функцией shm_open(), является дескриптором "второго сорта" (хотя и первой свежести): результат применения к нему функций fdopen(), read(), write() и т.п. не специфицирован.
Далее мы увидим, для чего этот дескриптор можно употребить и как организовать межпроцессное взаимодействие через разделяемые сегменты памяти реального времени в полном соответствии с положениями стандарта POSIX-2001. Здесь же отметим, что в большинстве реализаций для представления разделяемых сегментов применяются файлы, отображенные в память, так что их дескрипторы вполне пригодны для выполнения файловых операций, а мобильность приведенной программы с практической точки зрения можно считать удовлетворительной.

 

 
На главную | Содержание | < Назад....Вперёд >
С вопросами и предложениями можно обращаться по nicivas@bk.ru. 2013 г.Яндекс.Метрика