[Openais] [PATCH] Message Service - remove queue_group_entry structure
Ryan O'Hara
rohara at redhat.com
Thu Aug 7 14:19:50 PDT 2008
Here is a patch to the OpenAIS message service that removes the
queue_group_entry structure that made the code more complex that
necessary. This structure was essentially a wrapper around the
message_queue structure, and was only used for maintaining a list of
queues withing a queue group. Note that instead of using this extra
structure, the queue_group can keep a list of message_queue structs
directly.
This should make the code much easier to maintain, as well as simplify
the synchronization code, which is currently being developed.
On a related note, we should consider updating list.h to include some
of the handy iterators found in the kernel version of this header.
Ryan
-------------- next part --------------
Index: services/msg.c
===================================================================
--- services/msg.c (revision 1626)
+++ services/msg.c (working copy)
@@ -54,11 +54,11 @@
#include <corosync/engine/coroapi.h>
#include <corosync/engine/logsys.h>
#include <corosync/lcr/lcr_comp.h>
+
#include "../include/saAis.h"
#include "../include/saMsg.h"
#include "../include/ipc_msg.h"
-
LOGSYS_DECLARE_SUBSYS ("MSG", LOG_INFO);
enum msg_exec_message_req_types {
@@ -87,24 +87,28 @@
struct message_queue {
SaNameT name;
- int refcount;
- struct list_head list;
- struct list_head message_list_head;
+ SaUint8T refcount;
+ SaMsgQueueGroupChangesT change;
+ struct list_head group_list;
+ struct list_head queue_list;
+ struct list_head message_head;
};
struct queue_group {
SaNameT name;
SaUint8T track_flags;
SaMsgQueueGroupPolicyT policy;
- struct list_head list;
- struct list_head message_queue_head;
+ struct list_head group_list;
+ struct list_head queue_head;
};
+/*
struct queue_group_entry {
SaMsgQueueGroupChangesT change;
struct message_queue *message_queue;
struct list_head list;
};
+*/
/*
struct queue_cleanup {
@@ -116,7 +120,7 @@
*/
DECLARE_LIST_INIT(queue_list_head);
-DECLARE_LIST_INIT(queue_group_list_head);
+DECLARE_LIST_INIT(group_list_head);
static struct corosync_api_v1 *api;
@@ -686,34 +690,34 @@
static void print_message_list (struct message_queue *queue)
{
struct list_head *list;
- struct message_entry *entry;
+ struct message_entry *message;
- for (list = queue->message_list_head.next;
- list != &queue->message_list_head;
+ for (list = queue->message_head.next;
+ list != &queue->message_head;
list = list->next)
{
- entry = list_entry (list, struct message_entry, list);
+ message = list_entry (list, struct message_entry, list);
log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: print_message_list (%s) (%llu)\n",
- (char *)(entry->message.data),
- (unsigned long long)(entry->time));
+ (char *)(message->message.data),
+ (unsigned long long)(message->time));
}
}
static void print_queue_group_list (struct queue_group *group)
{
struct list_head *list;
- struct queue_group_entry *entry;
+ struct message_queue *queue;
- for (list = group->message_queue_head.next;
- list != &group->message_queue_head;
+ for (list = group->queue_head.next;
+ list != &group->queue_head;
list = list->next)
{
- entry = list_entry (list, struct queue_group_entry, list);
+ queue = list_entry (list, struct message_queue, group_list);
log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: print_queue_group_list (%s) (%u)\n",
- (char *)(entry->message_queue->name.value),
- (unsigned int)(entry->change));
+ (char *)(queue->name.value),
+ (unsigned int)(queue->change));
}
}
@@ -726,7 +730,7 @@
list != &queue_list_head;
list = list->next)
{
- queue = list_entry (list, struct message_queue, list);
+ queue = list_entry (list, struct message_queue, queue_list);
if (name_match (name, &queue->name)) {
return (queue);
@@ -735,16 +739,16 @@
return (0);
}
-static struct queue_group *queue_group_find (SaNameT *name)
+static struct queue_group *group_find (SaNameT *name)
{
struct list_head *list;
struct queue_group *group;
- for (list = queue_group_list_head.next;
- list != &queue_group_list_head;
+ for (list = group_list_head.next;
+ list != &group_list_head;
list = list->next)
{
- group = list_entry (list, struct queue_group, list);
+ group = list_entry (list, struct queue_group, group_list);
if (name_match (name, &group->name)) {
return (group);
@@ -753,19 +757,19 @@
return (0);
}
-static struct queue_group_entry *queue_group_entry_find (struct queue_group *group, struct message_queue *queue)
+static struct message_queue *group_queue_find (struct queue_group *group, SaNameT *name)
{
struct list_head *list;
- struct queue_group_entry *entry;
+ struct message_queue *queue;
- for (list = group->message_queue_head.next;
- list != &group->message_queue_head;
+ for (list = group->queue_head.next;
+ list != &group->queue_head;
list = list->next)
{
- entry = list_entry (list, struct queue_group_entry, list);
+ queue = list_entry (list, struct message_queue, group_list);
- if (entry->message_queue == queue) {
- return (entry);
+ if (name_match (name, &queue->name)) {
+ return (queue);
}
}
return (0);
@@ -777,8 +781,8 @@
unsigned int count = 0;
- for (list = group->message_queue_head.next;
- list != &group->message_queue_head;
+ for (list = group->queue_head.next;
+ list != &group->queue_head;
list = list->next)
{
count++;
@@ -789,17 +793,17 @@
static unsigned int queue_group_change_count (struct queue_group *group)
{
struct list_head *list;
- struct queue_group_entry *entry;
+ struct message_queue *queue;
unsigned int count = 0;
- for (list = group->message_queue_head.next;
- list != &group->message_queue_head;
+ for (list = group->queue_head.next;
+ list != &group->queue_head;
list = list->next)
{
- entry = list_entry (list, struct queue_group_entry, list);
+ queue = list_entry (list, struct message_queue, group_list);
- if (entry->change != SA_MSG_QUEUE_GROUP_NO_CHANGE) {
+ if (queue->change != SA_MSG_QUEUE_GROUP_NO_CHANGE) {
count++;
}
}
@@ -812,7 +816,7 @@
void *buffer)
{
struct list_head *list;
- struct queue_group_entry *entry;
+ struct message_queue *queue;
unsigned int i = 0;
@@ -825,31 +829,29 @@
case SA_TRACK_CURRENT:
case SA_TRACK_CHANGES:
- for (list = group->message_queue_head.next;
- list != &group->message_queue_head;
+ for (list = group->queue_head.next;
+ list != &group->queue_head;
list = list->next)
{
- entry = list_entry (list, struct queue_group_entry, list);
+ queue = list_entry (list, struct message_queue, group_list);
memcpy (¬ification[i].member.queueName,
- &entry->message_queue->name,
- sizeof (SaNameT));
- notification[i].change = entry->change;
+ &queue->name, sizeof (SaNameT));
+ notification[i].change = queue->change;
i++;
}
break;
case SA_TRACK_CHANGES_ONLY:
- for (list = group->message_queue_head.next;
- list != &group->message_queue_head;
+ for (list = group->queue_head.next;
+ list != &group->queue_head;
list = list->next)
{
- entry = list_entry (list, struct queue_group_entry, list);
- if (entry->change != SA_MSG_QUEUE_GROUP_NO_CHANGE) {
+ queue = list_entry (list, struct message_queue, group_list);
+ if (queue->change != SA_MSG_QUEUE_GROUP_NO_CHANGE) {
memcpy (¬ification[i].member.queueName,
- &entry->message_queue->name,
- sizeof (SaNameT));
- notification[i].change = entry->change;
+ &queue->name, sizeof (SaNameT));
+ notification[i].change = queue->change;
i++;
}
}
@@ -950,13 +952,15 @@
goto error_exit;
}
memset (queue, 0, sizeof (struct message_queue));
-
memcpy (&queue->name,
&req_exec_msg_queueopen->queue_name,
sizeof (SaNameT));
- list_init (&queue->list);
- list_init (&queue->message_list_head);
- list_add (&queue->list, &queue_list_head);
+
+ list_init (&queue->group_list);
+ list_init (&queue->queue_list);
+ list_init (&queue->message_head);
+ list_add (&queue->queue_list, &queue_list_head);
+
queue->refcount = 0;
}
queue->refcount += 1;
@@ -1124,8 +1128,11 @@
struct queue_group *queue_group;
SaAisErrorT error = SA_AIS_OK;
- queue_group = queue_group_find (&req_exec_msg_queuegroupcreate->queue_group_name);
+ log_printf (LOG_LEVEL_NOTICE, "EXEC request: saMsgQueueGroupCreate %s\n",
+ getSaNameT (&req_exec_msg_queuegroupcreate->queue_group_name));
+ queue_group = group_find (&req_exec_msg_queuegroupcreate->queue_group_name);
+
if (queue_group == 0) {
queue_group = malloc (sizeof (struct queue_group));
if (queue_group == 0) {
@@ -1133,13 +1140,13 @@
goto error_exit;
}
memset (queue_group, 0, sizeof (struct queue_group));
-
memcpy (&queue_group->name,
&req_exec_msg_queuegroupcreate->queue_group_name,
sizeof (SaNameT));
- list_init (&queue_group->list);
- list_init (&queue_group->message_queue_head);
- list_add (&queue_group->list, &queue_group_list_head);
+
+ list_init (&queue_group->group_list);
+ list_init (&queue_group->queue_head);
+ list_add (&queue_group->group_list, &group_list_head);
} else {
error = SA_AIS_ERR_EXIST;
}
@@ -1169,7 +1176,7 @@
struct res_lib_msg_queuegrouptrack res_lib_msg_queuegrouptrack;
struct message_queue *queue;
struct queue_group *queue_group;
- struct queue_group_entry *queue_group_entry;
+ /* struct queue_group_entry *queue_group_entry; */
SaMsgQueueGroupNotificationT *notification;
SaAisErrorT error = SA_AIS_OK;
SaAisErrorT error_cb = SA_AIS_OK;
@@ -1177,8 +1184,11 @@
unsigned int change_count = 0;
unsigned int member_count = 0;
- queue_group = queue_group_find (&req_exec_msg_queuegroupinsert->queue_group_name);
+ log_printf (LOG_LEVEL_NOTICE, "EXEC request: saMsgQueueGroupInsert %s\n",
+ getSaNameT (&req_exec_msg_queuegroupinsert->queue_group_name));
+ queue_group = group_find (&req_exec_msg_queuegroupinsert->queue_group_name);
+
if (queue_group == 0) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
@@ -1190,19 +1200,23 @@
goto error_exit;
}
+ /*
queue_group_entry = malloc (sizeof (struct queue_group_entry));
if (queue_group_entry == 0) {
error = SA_AIS_ERR_NO_MEMORY;
goto error_exit;
- }
+ }
+ */
- list_init (&queue_group_entry->list);
- list_add (&queue_group_entry->list, &queue_group->message_queue_head);
- list_add (&queue->list, &queue_list_head);
+ list_init (&queue->group_list);
+ list_add (&queue->group_list, &queue_group->queue_head);
+ list_add (&queue->queue_list, &queue_list_head);
- queue_group_entry->message_queue = queue;
- queue_group_entry->change = SA_MSG_QUEUE_GROUP_ADDED;
+ /* queue_group_entry->message_queue = queue; */
+ /* queue_group_entry->change = SA_MSG_QUEUE_GROUP_ADDED; */
+ queue->change = SA_MSG_QUEUE_GROUP_ADDED;
+
if (queue_group->track_flags & SA_TRACK_CHANGES) {
member_count = queue_group_member_count (queue_group);
change_count = queue_group_change_count (queue_group);
@@ -1242,8 +1256,10 @@
}
error_track:
- queue_group_entry->change = SA_MSG_QUEUE_GROUP_NO_CHANGE;
+ /* queue_group_entry->change = SA_MSG_QUEUE_GROUP_NO_CHANGE; */
+ queue->change = SA_MSG_QUEUE_GROUP_NO_CHANGE;
+
error_exit:
if (api->ipc_source_is_local(&req_exec_msg_queuegroupinsert->source)) {
res_lib_msg_queuegroupinsert.header.size =
@@ -1301,7 +1317,7 @@
struct res_lib_msg_queuegrouptrack res_lib_msg_queuegrouptrack;
struct queue_group *queue_group;
struct message_queue *queue;
- struct queue_group_entry *queue_group_entry;
+ /* struct queue_group_entry *queue_group_entry; */
SaMsgQueueGroupNotificationT *notification;
SaAisErrorT error = SA_AIS_OK;
SaAisErrorT error_cb = SA_AIS_OK;
@@ -1309,7 +1325,11 @@
unsigned int change_count = 0;
unsigned int member_count = 0;
- queue_group = queue_group_find (&req_exec_msg_queuegroupremove->queue_group_name);
+ log_printf (LOG_LEVEL_NOTICE, "EXEC request: saMsgQueueGroupRemove %s\n",
+ getSaNameT (&req_exec_msg_queuegroupremove->queue_group_name));
+
+ queue_group = group_find (&req_exec_msg_queuegroupremove->queue_group_name);
+
if (queue_group == 0) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
@@ -1321,14 +1341,18 @@
goto error_exit;
}
+ /*
queue_group_entry = queue_group_entry_find (queue_group, queue);
if (queue_group_entry == 0) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
}
+ */
- queue_group_entry->change = SA_MSG_QUEUE_GROUP_REMOVED;
+ /* queue_group_entry->change = SA_MSG_QUEUE_GROUP_REMOVED; */
+ queue->change = SA_MSG_QUEUE_GROUP_REMOVED;
+
if (queue_group->track_flags & SA_TRACK_CHANGES) {
member_count = queue_group_member_count (queue_group);
change_count = queue_group_change_count (queue_group);
@@ -1368,10 +1392,12 @@
}
error_track:
- queue_group_entry->change = SA_MSG_QUEUE_GROUP_NO_CHANGE;
+ /* queue_group_entry->change = SA_MSG_QUEUE_GROUP_NO_CHANGE; */
- list_del (&queue_group_entry->list);
+ queue->change = SA_MSG_QUEUE_GROUP_NO_CHANGE;
+ list_del (&queue->group_list);
+
error_exit:
if (api->ipc_source_is_local(&req_exec_msg_queuegroupremove->source)) {
res_lib_msg_queuegroupremove.header.size =
@@ -1429,10 +1455,13 @@
struct queue_group *queue_group;
SaAisErrorT error = SA_AIS_OK;
- queue_group = queue_group_find (&req_exec_msg_queuegroupdelete->queue_group_name);
+ log_printf (LOG_LEVEL_NOTICE, "EXEC request: saMsgQueueGroupDelete %s\n",
+ getSaNameT (&req_exec_msg_queuegroupdelete->queue_group_name));
+ queue_group = group_find (&req_exec_msg_queuegroupdelete->queue_group_name);
+
if (queue_group) {
- list_del (&queue_group->list);
+ list_del (&queue_group->group_list);
free (queue_group);
} else {
error = SA_AIS_ERR_NOT_EXIST;
@@ -1467,8 +1496,11 @@
SaMsgQueueGroupNotificationT *notification;
- queue_group = queue_group_find (&req_exec_msg_queuegrouptrack->queue_group_name);
+ log_printf (LOG_LEVEL_NOTICE, "EXEC request: saMsgQueueGroupTrack %s\n",
+ getSaNameT (&req_exec_msg_queuegrouptrack->queue_group_name));
+ queue_group = group_find (&req_exec_msg_queuegrouptrack->queue_group_name);
+
if (queue_group == 0) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
@@ -1478,9 +1510,6 @@
change_count = queue_group_change_count (queue_group);
if (req_exec_msg_queuegrouptrack->track_flags & SA_TRACK_CURRENT) {
- /* DEBUG */
- log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: SA_TRACK_CURRENT\n");
-
notification = malloc (sizeof (SaMsgQueueGroupNotificationT) * member_count);
if (notification == NULL) {
@@ -1495,14 +1524,10 @@
}
if (req_exec_msg_queuegrouptrack->track_flags & SA_TRACK_CHANGES) {
- /* DEBUG */
- log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: SA_TRACK_CHANGES\n");
queue_group->track_flags = req_exec_msg_queuegrouptrack->track_flags;
}
if (req_exec_msg_queuegrouptrack->track_flags & SA_TRACK_CHANGES_ONLY) {
- /* DEBUG */
- log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: SA_TRACK_CHANGES_ONLY\n");
queue_group->track_flags = req_exec_msg_queuegrouptrack->track_flags;
}
@@ -1575,8 +1600,11 @@
struct queue_group *queue_group;
SaAisErrorT error = SA_AIS_OK;
- queue_group = queue_group_find (&req_exec_msg_queuegrouptrackstop->queue_group_name);
+ log_printf (LOG_LEVEL_NOTICE, "EXEC request: saMsgQueueGroupTrackStop %s\n",
+ getSaNameT (&req_exec_msg_queuegrouptrackstop->queue_group_name));
+ queue_group = group_find (&req_exec_msg_queuegrouptrackstop->queue_group_name);
+
if (queue_group == 0) {
error = SA_AIS_ERR_NOT_EXIST;
goto error_exit;
@@ -1650,7 +1678,7 @@
entry->time = clust_time_now();
- list_add_tail (&entry->list, &queue->message_list_head);
+ list_add_tail (&entry->list, &queue->message_head);
error_exit:
@@ -1716,18 +1744,18 @@
goto error_exit;
}
- if (list_empty (queue->message_list_head.next)) {
+ if (list_empty (queue->message_head.next)) {
error = SA_AIS_ERR_TIMEOUT; /* FIX ME */
goto error_exit;
}
- entry = list_entry (queue->message_list_head.next, struct message_entry, list);
+ entry = list_entry (queue->message_head.next, struct message_entry, list);
if (entry == NULL) {
error = SA_AIS_ERR_LIBRARY; /* FIX ME */
goto error_exit;
}
- list_del (queue->message_list_head.next);
+ list_del (queue->message_head.next);
error_exit:
More information about the Openais
mailing list