учебники, программирование, основы, введение в,

 

Технология публикация-подписка для WebSphere MQ

Общие сведения о модели публикация-подписка
Механизм публикация-подписка (Publish/Subscribe) позволяет поставлять информацию от поставщика к потребителю. Эта модель стала особенно популярной в последние годы благодаря тому, что часто меняющаяся информация может поставляться постоянно многим получателям. Одним из типичных примеров такой информации являются данные на рынке акций и валют. В такой модели издателю необязатеьно знать о местонахождении получателя и наоборот. В модели Request/Reply движение информации начинается по запросу потребителя (клиента). В модели Publish/Subscribe движение информации начинается по мере ее появления и поступления от поставщика.
Поставщика информации принято называть издатель (publisher). издатель предлагает информацию на определенные темы. Потребитель информации называется подписчиком (subscriber), он подписывается на получение информации на определенные темы. Информация, которую получает один подписчик, может передаваться другим подписчикам.
Информация, посылаемая как сообщение, характеризуется темой. издатель посылает информацию только на те темы, на которые сделана подписка. Взаимодействие между издателями и подписчиками контролируется посредником или брокером (broker). Взаимосвязанные темы группируются совместно в потоки (stream). издатель может применять потоки для ограничения диапазонов тем издателей и подписчиков. Через брокера идет поток, который использует все темы. На представлена схема, иллюстрирующая взаимодействие брокеров, издателей и подписчиков.
Общий подход работы механизма Publish/Subscribe лучше всего иллюстрируется на примере трех видов сценариев, которые реализуются на основе командных сообщений.
Сценарий издатель-брокер.

  1. издатель регистрирует свое намерение публиковать информацию по определенным темам.
  2. издатель посылает сообщение-публикацию брокеру, содержащее дату публикации. Сообщение может быть перенаправлено брокером непосредственно подписчикам, или может храниться у брокера, пока его не востребует подписчик.
  3. издатель может послать сообщение брокеру, чтобы хранящаяся у него публикация была удалена.
  4. издатель может отказаться от регистрации у брокера, когда он заканчивает отправку сообщений на определенную тему.

Сценарий подписчик-брокеру.

  1. подписчик регистрируется у брокера, определяя темы, которые его интересуют.
  2. брокер начинает посылать подписчику публикации на заданные темы. подписчик может потребовать выдать публикации, хранящиеся у брокера.

Сценарий брокер-брокер реализуется в виде следующих взаимодействий

  1. брокеры могут обмениваться регистрациями подписчиков и прекращать регистрации.
  2. брокеры могут обмениваться публикациями и требованиями на удаление публикаций.
  3. брокеры могут обмениваться информацией о самих себе.

Механизм Publish/Subscribe поддерживает брокеров на основе функций WebSphere MQ на платформах для AIX, HP-UX, Linux, Windows NT, Microsoft Windows 2000 и Sun Solaris. Допустимо иметь по одному брокеру на менеджере очередей. брокер имеет то же самое имя, что и менеджер. Приложения могут писаться на основе стандартных приемов программирования с использованием технологий Message Queue Interface (MQI) или Application Messaging Interface (AMI). Издатели и подписчики могут быть на любых платформах, на которых поддерживается WebSphere MQ, например, издатель - на OS/390 и подписчик - на OS/2. брокеры могут объединяться в сеть, в которой есть корневой брокер, брокеры-родители и брокеры-дети.
Для работы с механизмом Publish/Subscribe существует несколько основных функций или, иначе называемых, командных сообщений, в которых помещается команда брокеру. Синтаксис этих функций имеет простые правила при определении темы. Тема задается строкой длиной не более 256 байт без пробелов и двойных кавычек ("). Тема и подтемы разделяются символом "/" (без пробелов), например, для рынка акций формат темы задается следующим образом:
Регион/СекторРынка/Компания
Конкретные темы выглядят следующим образом:
NewYork/InformationTechnology/IBM
Этот синтаксис разрешает в строках иметь символы:
* - означающий ноль или любое количество
произвольных символов;
? – означающий один произвольный символ.
Данный синтаксис позволит в дальнейшем определить, например, такие темы для подписки на рынке акций как:
*               получение всех цен на акции
со всех рынков мира;
London/*        получение всех цен на акции
с Лондонской биржи;
NewYork/Banks/* получение цен акций всех
нью-йоркских банков;
*/*/IBM         получение всех цен на акции
компании ИБМ на всех рынках мира.
Символ % позволяет использовать специальные символы * и ? в строках, например, 'ABC%*D' означает 'ABC*D'.
В числе основных функций для работы с механизмом Publish/Subscribe (MQPSCommand) следует назвать:


RegPub

Регистрация издателя (Register Publisher)

RegSub

Регистрация подписчика (Register Subscriber)

Publish

Публикация

ReqUpdate

Запрос от издателя к брокеру на обновление публикации на заданную тему (Request Update)

DeletePub

Удаление публикации (Delete Publication)

DeregPub

Отказ от регистрации издателя (Deregister Publisher)

DeregSub

Отказ от регистрации подписчика (Deregister Subscriber)

Более подробно эти функции будут рассмотрены на примере ниже, а описание их форматов можно найти в документации.
Для осуществления работы издателя и подписчика используется формат MQRFH (Rules and Formatting Header - RF Header) WebSphere MQ сообщения. формат MQRFH представляет собой структуру заголовка переменной длины, в которую приложение размещает данные в виде префикса при публикации. В эту структуру входят следующие переменные:
typedef struct tagMQRFH {
MQCHAR4 StrucId;          
/* Идентификатор структуры */
MQLONG Version;          
/* Номер версии структуры */
MQLONG StrucLength;      
/* Общая длина MQRFH */
MQLONG Encoding;         
/* Data encoding */
MQLONG CodedCharSetId;   
/* Идентификатор множества перекодировки */
MQCHAR8 Format;          
/* Имя формата */
MQLONG Flags;            
/* Флаги */
} MQRFH;
Этот заголовок также включает строку NameValueString, в которой приложение публикации или подписки помещает команды, которые должен выполнить брокер. Поле StrucLength в заголовке определяет длину структуры заголовка, включительно с переменной длины NameValueString в конце структуры. Поля Encoding, CodedCharSetId и Format описывают структуру данных в заголовке публикации.

Технология разработки приложений для модели публикация-подписка
Задача создания собственных приложений Издателя и подписчика рассматривается на следующей упрощенной модели: один издатель, один брокер и один подписчи. Будет использоваться формат сообщений WebSphere MQ – MQRFH и классический MQI интерфейс. Для создания приложений необходимо иметь руководство по командам и опциям Publish/Subscribe, а также некоторые знания по программированию на С.
Приложение Издателя должно публиковать символьные строки, задаваемые пользователем на определенные темы. Следовательно, такая программа с именем publisher может иметь следующий формат запуска:
publisher  Topic  Command  QMgrName  PubQueue
где Topic – тема публикации, символьная строка длиной не более 256 байт; Command – команды Издателя для брокера (RegPub, Publish, ReqUpdate, DeletePub, DeregPub), PubQueue - очередь издателя. Текст публикации вводится в командном окне с запущенной программой.
Приложение подписчика должно иметь возможность зарегистрироваться (подписаться) на заданные темы и отображать результаты полученных публикаций. Программа подписчика с именем subscriber может иметь формат запуска:
subscriber  Topic    QMgrName  SubQueue
где Topic – тема подписки, SubQueue - очередь подписчика. Текст публикации на тему Topic отображается в командном окне с запущенной программой subscriber.
Для работы приложений требуется создание очередей: одна у Издателя и одна у подписчика, наименования которых, соответственно, Publisher_queue и Subscriber_queue. издатель будет получать сообщения от брокера через Publisher_queue. подписчик - регистрировать темы и получать публикации через Subscriber_queue. Создание этих очередей осуществляется простой командой runmqsc:
runmqsc QMgrName
define qlocal(имя очереди)
end
Следует рассмотреть потоки сообщений и то, как сообщения по определенной теме находят своего подписчика.

  1. подписчик посылает заявки на подписку в управляющую очередь брокера: SYSTEM.BROKER.CONTROL.QUEUE, содержащие подписку на определенную тему, например, TestTopic. брокер читает это сообщение и запоминает детали подписки, такие как тема и очередь, в которой подписчик хочет получать сообщения. Потоки являются средством группирования разных тем. Состояния брокера и заявок на подписку запоминаются во внутренних очередях брокера: SYSTEM.BROKER.*.
  2. Приложение-издатель публикует информацию по определенной теме, например, по TestTopic и оно попадает в потоковую очередь брокера. По умолчанию это очередь:
  • SYSTEM.BROKER.DEFAULT.STREAM
  1. Сообщение о публикации читается брокером и направляется в очередь подписчика.
  2. Приложение подписчика читает публикацию из очереди подписчика.

Сообщения публикация-подписка имеет время жизни, по истечению которого они удаляются. Сообщения публикация-подписка являются постоянными сообщениями (persistent) и восстанавливаются после перезагрузки менеджера и/или брокера.
Также как и сообщения WebSphere MQ потоки информации от издателя к подписчику являются асинхронными. Если брокер не работает, то публикация будет оставаться в потоковой очереди до тех пор, пока брокер не стартует. Если приложение подписчика не активно, то публикация будет оставаться в очереди подписчика.
Основные шаги, из которых складывается создание Приложения издателя. Таких шагов для программы publisher будет 7.
Шаг 1. Для того чтобы положить сообщение в очередь потоков брокера, необходимо подключиться к менеджеру брокера и открыть очередь командой MQOPEN для помещения сообщений.
Шаг 2. Для публикации необходимо сформировать MQRFH структуру (см. руководство по WebSphere MQ Publish/Subscribe). Все поля этой структуры должны иметь определенные значения. Некоторые значения MQRFH должны быть изменены по сравнению со значениями по умолчанию перед публикацией или в тот момент, когда мы определим значения этих полей.
Шаг 3. Сразу за структурой MQRFH должна следовать символьная строка NameValueString. Указатель pNameValueString определяет начальную позицию NameValueString в теле сообщения.
Шаг 4. Содержимое строки NameValueString должно включать всю необходимую информацию о публикации и всю необходимую последовательность команд для формирования непрерывной символьной строки нашей публикации. Содержимое строки NameValueString позволяет брокеру определить Publish/Subscribe команды и порядок их обработки.
Шаг 5. Поле StrucLength должно содержать длину MQRFH структуры и сопровождающей его строки NameValueString. Длина MQRFH фиксирована и длина NameValueString – переменная. Значение StrucLength позволяет не применять разделитель конца строки в NameValueString, хотя и он может быть использован, если это необходимо приложению-подписчику. MQRFH и NameValueString выравниваются на границе 16 байт автоматически.
Шаг 6. Данные о публикации (в данном случае строка данных для публикации) помещаются сразу после MQFRH и NameValueString структуры, указатель pUserData должен быть установлен на начальную позицию строки введенных данных о публикации.
Шаг 7. Осуществляется процедура тестирования работы программы. Программа компилируется, устраняются ошибки компиляции. Она вызывается, например, командой:
publisher  TestTopic  Publish  QM_broker
Publisher_queue
В командном окне программы publisher вводится сообщение по теме: Hello
На этом этапе еще нет приложения подписчика, поэтому необходимо:

  • проверить, нет ли сообщений об ошибках от брокера в очереди Publisher_queue и исправить это; есть ли успешные сообщения от брокера;
  • посмотреть сгенерированное сообщение о публикации в очереди SYSTEM.DEFAULT.LOCAL.STREAM, используя MQ explorer или команду amqsbcg и убедиться, что в формате сообщения о публикации нет ошибок.

Основные шаги, из которых складывается создание Приложения подписчика.
Шаг 1. Прежде всего, командой MQOPEN необходимо открыть очередь брокера для того, чтобы затем положить командное сообщение в соответствующую очередь.
Шаг 2. На этом шаге необходимо зарегистрировать интересы в виде темы и послать брокеру запрос на регистрацию подписки. Поскольку команды из основной программы будут посылаться брокеру довольно часто, необходимо разработать функцию SendBrokerCommand, одним из аргументов которой является командная строка для помещения в NameValueString командного сообщения. Эта функция SendBrokerCommand должна быть аналогичной по коду, как и для приложения Издателя.
Шаг 3. Теперь, когда подписчик зарегистрирован, можно ждать поступление публикаций в очередь подписчика, и как только они поступят, их можно прочесть командой MQGET. Первое, что необходимо сделать при получении сообщения, это проверить, что оно в нужном формате MQRFH.
Шаг 4. После распознавания формата MQRFH сообщения, можно выделить порцию наиболее интересного сообщения. В данном случае нет необходимости смотреть на строку NameValueString так как интересуют данные, которые следуют за этой строкой, они задаются указателем pUserData и могут быть напечатаны на экране или выведены в файл.
Шаг 5. На этом последнем шаге необходимо отказаться от регистрации и, как это делалось раньше на шаге 2, вызвать функцию SendBrokerCommand, добавив соответствующую команду для отказа от регистрации. Теперь, так же как и раньше, надо скомпилировать код и затем исправить ошибки компиляции. После этого программа готова для работы, ее можно выполнить:
subscriber  TestTopic    QM_broker 
Subscriber_queue
Этот запуск должен отобразить информацию об успешной регистрации у брокера. Если сообщение об успешной регистрации не поступило, то следует посмотреть сообщение об ошибке и исправить ее. После того как прошла успешная регистрация, можно попробовать получить сообщение по подписке. Для этого запустить программу publisher на тему TestTopic с сообщением, например, PublisherReady. В результате, в программе-подписчике должна появиться информация: PublisherReady.
Если этого не произойдет, то необходимо найти ошибку.
Получив успешно одну публикацию от одного издателя, можно расширить эксперимент и попробовать запустить много программ-издателей и программ-подписчиков. Каждой из них понадобиться своя очередь. Если сделать публикации на разные темы, можно ожидать получение определенных сообщений на экранах подписчиков, подписавшихся на соответствующие темы.

Примеры работы механизмов публикация-подписка
Теперь после знакомства с технологией публикация-подписка следует рассмотреть работу модели Publish/Subscribe на простых примерах. Для этого понадобиться инсталлировать SupportPacs MA0C: WebSphere MQ (WebSphere MQ) Publish/Subscribe с сайта
После этого можно стартовать брокер на менеджере очередей командой:
strmqbrk -m QMgrName
Для отображения состояния брокера можно использовать команду dspmqbrk:
dspmqbrk -m QMgrName
В ответ появиться следующее сообщение:
WebSphere MQ message broker for queue manager QMgrName running
Теперь брокер готов получать команды от издателей и подписчиков.
На каждом менеджере может быть стартован только один брокер.
В менеджере есть необходимые системные очереди, которые можно увидеть командой
runmqsc QMgrName
display qlocal(SYSTEM.BROKER.*)
end
Следует сразу отметить, что завершение работы брокера осуществляется командой endmqbrk перед окончанием работы менеджера: endmqbrk -m QMgrName.
Работу издателя можно продемонстрировать с помощью программы amqsgama, предложенной в SupportPacs MA0C в качестве теста. Эта программа издателя из перечня спортивных тем для подписки () помещает на брокер сообщения о футбольных матчах (Sport/Soccer/Event/ - в таблице данная тема выделена курсивом) и проверяет ответы брокера.


Таблица 10.1. Возможные спортивные темы для подписки

спорт/футбол/*
спорт/теннис/*
спорт/баскетбол/*

спорт/футбол/расписание игр
спорт/футбол/события
спорт/футбол/обзоры

Формат запуска программы:
amqsgama TeamName1 TeamName2 QMgrName
Результаты работы программы, моделирующей случайным образом забивание голов той или иной командой, выглядит следующим образом
Для работы программы необходимо создание очереди: SAMPLE.BROKER.RESULTS.STREAM.
Именно в эту очередь поступают сообщения от издателя. Необходимо также, чтобы был запущен брокер. Программа подписчика должна стартовать раньше, чем программа издателя amqsgama, чтобы отобразить результаты игры полностью. Все используемые функции в программе служат для подключения к менеджеру брокера и публикации событий о начале матча, окончании матча и забивание гола.
Блочная структура программы выглядит следующим образом.
Подключение к менеджеру брокера (MQCONN)
Открытие очереди потока брокера (MQOPEN)
Инициализация таймера матча
Генерация MQRFH для публикации события о
начале матча
Добавление имен команд в данные
Помещение публикации в очередь потоков
Начало цикла по времени матча:
засыпание на случайный период
попытка забить гол (50% вероятность)
генерация публикации о забитом голе
(RFH для ScoreUpdate)
случайный выбор команды, забившей гол
добавление имени команды в данные для
публикации
помещение публикации в очередь потоков
Окончание цикла по времени матча
Генерация MQRFH для публикации события о
конце матча
Добавление имен команд для публикации
Помещение публикации в очередь потоков
Закрытие очереди потока брокера (MQCLOSE)
Отключение от менеджера брокера (MQDISC)
Программа amqsgama имеет следующий код:
В качестве комментария следует отметить, что функция BuildMQRFHeader формирует значения по умолчанию для заголовка MQRFH, устанавливает параметры format и CCSID пользовательских данных. В строку NameValueString добавляются команды, тема и опции для публикации и она выравнивается на 16-ти байтовую границу. StrucLength в MQRFH устанавливается как общая длина. Входными параметрами функции являются pStart – начало блока сообщения, TopicType[] – строка с именем темы. Входным и выходным параметром одновременно является pDataLength – размер блока сообщения при входе и размер выходного блока информации.
Функция PutPublication формирует сообщение для вывода в очередь брокера с помощью команды MQPUT. Входными параметрами функции являются hConn – идентификатор менеджера для команды MQHCONN, hObj – идентификатор очереди, pMessage – идентификатор на начало блока сообщения, messageLength – длина данных в сообщении. Выходными параметрами функции являются pCompCode и pReason – коды завершения команды MQPUT.
Работу подписчика можно продемонстрировать с помощью программы amqsresa из состава SupportPacs MA0C, которая подписывается у брокера на заданную тему (футбол) и получает сообщения от брокера. Формат запуска программы:
amqsresa QMgrName
где QmgrName – имя менеджера очередей, на котором запущен брокер.
Необходимая тема подписки (TOPIC = "Sport/Soccer/*") и очереди для подписчика заданы в теле программы (эти параметры рекомендуется выносить в командную строку или файл инициализации для создания универсальных программ – примеч. автора).
Для работы программы необходимо создание очередей:
runmqsc QMgrName
define qlocal(SAMPLE.BROKER.RESULTS.STREAM)
define qlocal(RESULTS.SERVICE.SAMPLE.QUEUE)
define qlocal(SYSTEM.BROKER.CONTROL.QUEUE)
end
Результаты работы программы amqsresa, получающей сообщения от брокера, выглядят следующим образом
Объем программы подписчика amqsresa более 2000 строк (в том числе комментариев – 40%) и это не позволяет привести ее в данной лекции. Стоит ограничиться лишь кратким алгоритмом.
Подключение к менеджеру брокера (MQCONN )
Открытие очереди потока брокера и подписчика
(MQOPEN )
Генерация MQRFH и подписка на все события
Ожидание появления сообщений в очереди
подписчика (до 3 минут)
Извлечение из очереди MQGET всех публикаций
Отбор публикации по теме "Sport/Soccer/*”
Обработка и отображение результатов публикации
в зависимости от событий (начало матча,
конец матча, изменение счета)
Выход из цикла по концу матча или по таймеру
Закрытие очереди потока брокера и подписчика
(MQCLOSE)
Отключение от менеджера брокера (MQDISC)
В комментарии к алгоритму следует отметить, что подписка на все события (в алгоритме выделено курсивом) вряд ли объяснима, скорее это сделано в учебных целях. подписчику нет необходимости получать все публикации, поэтому в команде регистрации подписчика (RegSub) целесообразно осуществлять подписку сразу на заданную тему.
В заключение лекции можно привести график времени доставки публикации (мсек) в зависимости от количества подписчиков, полученный на компьютере RISC/6000, 200MHz, 1 GB RAM с операционной системой AIX 4.3.0. Этот график показывает, что механизм Publish/Subscribe обеспечивает более высокую производительность, чем приложения, созданные на основе классического подхода с помощью MQI интерфейса и Distribution List.

 

 
На главную | Содержание | < Назад....Вперёд >
С вопросами и предложениями можно обращаться по nicivas@bk.ru. 2013 г.Яндекс.Метрика