[Openais] [PATCH] Fix round robin message delivery for queue groups

Ryan O'Hara rohara at redhat.com
Thu Aug 14 14:42:29 PDT 2008


Here is a patch that fixes round robin message delivery for queue
groups. Please note that only queue group policy currently supported
my the message service is round robin.

This patch keeps track of which queue is next (for message delivery)
within each queue group (struct message_queue *rr_queue). Each time a
message is sent to a queue group we increment the rr_queue pointer to
the next queue in the group (see next_rr_queue function).

Also checks current rr_queue pointer when removing a queue from a
group such that we increment the rr_queue pointer if we happen to be
removing the queue that is currently points to.

Also included a few fixes for compiler warnings.

-------------- next part --------------
Index: test/testmsg2.c
===================================================================
--- test/testmsg2.c	(revision 1634)
+++ test/testmsg2.c	(working copy)
@@ -128,6 +128,15 @@
 	strcpy (name->value, str);
 }
 
+void setSaMsgMessageT (SaMsgMessageT *message, char *data) {
+	message->type = 1;
+	message->version = 2;
+	message->size = strlen (data) + 1;
+	message->senderName = NULL;
+	message->data = strdup (data);
+	message->priority = 0;
+}
+
 int main (void)
 {
 	int result;
@@ -135,6 +144,7 @@
 	SaSelectionObjectT select_obj;
 
 	SaMsgHandleT handle;
+	SaMsgMessageT message;
 
 	SaMsgQueueHandleT queue_handle_a;
 	SaMsgQueueHandleT queue_handle_b;
@@ -295,12 +305,120 @@
 	printf ("[DEBUG]: (%d) saMsgDispatch\n", result);
 
 	/*
+	* Send messages to GROUP_ONE
+	*/
+
+	setSaMsgMessageT (&message, "test_msg_01a");
+	result = saMsgMessageSend (handle, &queue_group_one, &message,
+				   SA_TIME_ONE_SECOND);
+	printf ("[DEBUG]: (%d) saMsgMessageSend { group: %s + message: %s }\n",
+		result, (char *)(queue_group_one.value), (char *)(message.data));
+
+	setSaMsgMessageT (&message, "test_msg_01b");
+	result = saMsgMessageSend (handle, &queue_group_one, &message,
+				   SA_TIME_ONE_SECOND);
+	printf ("[DEBUG]: (%d) saMsgMessageSend { group: %s + message: %s }\n",
+		result, (char *)(queue_group_one.value), (char *)(message.data));
+
+	setSaMsgMessageT (&message, "test_msg_01c");
+	result = saMsgMessageSend (handle, &queue_group_one, &message,
+				   SA_TIME_ONE_SECOND);
+	printf ("[DEBUG]: (%d) saMsgMessageSend { group: %s + message: %s }\n",
+		result, (char *)(queue_group_one.value), (char *)(message.data));
+
+	/*
+	* Remove QUEUE_A (next rr_queue) from GROUP_ONE
+	*/
+
+	result = saMsgQueueGroupRemove (handle, &queue_group_one, &queue_name_a);
+	printf ("[DEBUG]: (%d) saMsgQueueGroupRemove { group: %s - queue: %s }\n",
+		result, (char *)(queue_group_one.value), (char *)(queue_name_a.value));
+
+	setSaMsgMessageT (&message, "test_msg_01d");
+	result = saMsgMessageSend (handle, &queue_group_one, &message,
+				   SA_TIME_ONE_SECOND);
+	printf ("[DEBUG]: (%d) saMsgMessageSend { group: %s + message: %s }\n",
+		result, (char *)(queue_group_one.value), (char *)(message.data));
+
+	setSaMsgMessageT (&message, "test_msg_01e");
+	result = saMsgMessageSend (handle, &queue_group_one, &message,
+				   SA_TIME_ONE_SECOND);
+	printf ("[DEBUG]: (%d) saMsgMessageSend { group: %s + message: %s }\n",
+		result, (char *)(queue_group_one.value), (char *)(message.data));
+
+	/*
+	* Send messages to GROUP_TWO
+	*/
+
+	setSaMsgMessageT (&message, "test_msg_02a");
+	result = saMsgMessageSend (handle, &queue_group_two, &message,
+				   SA_TIME_ONE_SECOND);
+	printf ("[DEBUG]: (%d) saMsgMessageSend { group: %s + message: %s }\n",
+		result, (char *)(queue_group_two.value), (char *)(message.data));
+
+	setSaMsgMessageT (&message, "test_msg_02b");
+	result = saMsgMessageSend (handle, &queue_group_two, &message,
+				   SA_TIME_ONE_SECOND);
+	printf ("[DEBUG]: (%d) saMsgMessageSend { group: %s + message: %s }\n",
+		result, (char *)(queue_group_two.value), (char *)(message.data));
+
+	setSaMsgMessageT (&message, "test_msg_02c");
+	result = saMsgMessageSend (handle, &queue_group_two, &message,
+				   SA_TIME_ONE_SECOND);
+	printf ("[DEBUG]: (%d) saMsgMessageSend { group: %s + message: %s }\n",
+		result, (char *)(queue_group_two.value), (char *)(message.data));
+
+	/*
+	* Remove QUEUE_X (next rr_queue from GROUP_TWO
+	*/
+
+	result = saMsgQueueGroupRemove (handle, &queue_group_two, &queue_name_x);
+	printf ("[DEBUG]: (%d) saMsgQueueGroupRemove { group: %s - queue: %s }\n",
+		result, (char *)(queue_group_two.value), (char *)(queue_name_x.value));
+
+	setSaMsgMessageT (&message, "test_msg_02d");
+	result = saMsgMessageSend (handle, &queue_group_two, &message,
+				   SA_TIME_ONE_SECOND);
+	printf ("[DEBUG]: (%d) saMsgMessageSend { group: %s + message: %s }\n",
+		result, (char *)(queue_group_two.value), (char *)(message.data));
+
+	setSaMsgMessageT (&message, "test_msg_02e");
+	result = saMsgMessageSend (handle, &queue_group_two, &message,
+				   SA_TIME_ONE_SECOND);
+	printf ("[DEBUG]: (%d) saMsgMessageSend { group: %s + message: %s }\n",
+		result, (char *)(queue_group_two.value), (char *)(message.data));
+
+	/*
+	* Attempt to remove a queue from GROUP_ONE that is not a member.
+	* Should return SA_AIS_ERR_NOT_EXIST (12).
+	*/
+
+	/*
+	result = saMsgQueueGroupRemove (handle, &queue_group_one, &queue_name_z);
+	printf ("[DEBUG]: (%d)=12 saMsgQueueGroupRemove { group: %s - queue: %s }\n",
+		result, (char *)(queue_group_one.value), (char *)(queue_name_z.value));
+	*/
+
+	/*
+	* Attempt to remove a queue from GROUP_TWO that is not a member.
+	* Should return SA_AIS_ERR_NOT_EXIST (12).
+	*/
+
+	/*
+	result = saMsgQueueGroupRemove (handle, &queue_group_two, &queue_name_c);
+	printf ("[DEBUG]: (%d)=12 saMsgQueueGroupRemove { group: %s - queue: %s }\n",
+		result, (char *)(queue_group_two.value), (char *)(queue_name_c.value));
+	*/
+
+	/*
 	* Remove queues from GROUP_ONE
 	*/
 
+	/*
 	result = saMsgQueueGroupRemove (handle, &queue_group_one, &queue_name_a);
 	printf ("[DEBUG]: (%d) saMsgQueueGroupRemove { group: %s - queue: %s }\n",
 		result, (char *)(queue_group_one.value), (char *)(queue_name_a.value));
+	*/
 
 	result = saMsgQueueGroupRemove (handle, &queue_group_one, &queue_name_b);
 	printf ("[DEBUG]: (%d) saMsgQueueGroupRemove { group: %s - queue: %s }\n",
@@ -314,9 +432,11 @@
 	* Remove queues from GROUP_TWO
 	*/
 
+	/*
 	result = saMsgQueueGroupRemove (handle, &queue_group_two, &queue_name_x);
 	printf ("[DEBUG]: (%d) saMsgQueueGroupRemove { group: %s - queue: %s }\n",
 		result, (char *)(queue_group_two.value), (char *)(queue_name_x.value));
+	*/
 
 	result = saMsgQueueGroupRemove (handle, &queue_group_two, &queue_name_y);
 	printf ("[DEBUG]: (%d) saMsgQueueGroupRemove { group: %s - queue: %s }\n",
Index: services/msg.c
===================================================================
--- services/msg.c	(revision 1634)
+++ services/msg.c	(working copy)
@@ -98,6 +98,7 @@
 	SaNameT name;
 	SaUint8T track_flags;
 	SaMsgQueueGroupPolicyT policy;
+	struct message_queue *rr_queue;
 	struct list_head group_list;
 	struct list_head queue_head;
 };
@@ -687,6 +688,7 @@
 	return;
 }
 
+#if 0
 static void print_message_list (struct message_queue *queue)
 {
 	struct list_head *list;
@@ -703,7 +705,9 @@
 			    (unsigned long long)(message->time));
 	}
 }
+#endif
 
+#if 0
 static void print_queue_group_list (struct queue_group *group)
 {
 	struct list_head *list;
@@ -720,6 +724,7 @@
 			    (unsigned int)(queue->change));
 	}
 }
+#endif
 
 static struct message_queue *queue_find (SaNameT *name)
 {
@@ -757,6 +762,7 @@
 	return (0);
 }
 
+#if 0
 static struct message_queue *group_queue_find (struct queue_group *group, SaNameT *name)
 {
 	struct list_head *list;
@@ -774,6 +780,7 @@
 	}
 	return (0);
 }
+#endif
 
 static unsigned int queue_group_member_count (struct queue_group *group)
 {
@@ -864,6 +871,23 @@
 	return (i);
 }
 
+static struct message_queue *next_rr_queue (struct queue_group *group)
+{
+	struct message_queue *queue;
+
+	if (group->rr_queue->group_list.next == &group->queue_head) {
+		queue = list_entry (group->queue_head.next,
+				    struct message_queue,
+				    group_list);
+	} else {
+		queue =	list_entry (group->rr_queue->group_list.next,
+				    struct message_queue,
+				    group_list);
+	}
+
+	return (queue);
+}
+
 static int msg_exec_init_fn (struct corosync_api_v1 *corosync_api)
 {
 	api = corosync_api;
@@ -1144,6 +1168,8 @@
 			&req_exec_msg_queuegroupcreate->queue_group_name,
 			sizeof (SaNameT));
 
+		queue_group->policy = req_exec_msg_queuegroupcreate->policy;
+
 		list_init (&queue_group->group_list);
 		list_init (&queue_group->queue_head);
 		list_add (&queue_group->group_list, &group_list_head);
@@ -1176,8 +1202,7 @@
 	struct res_lib_msg_queuegrouptrack res_lib_msg_queuegrouptrack;
 	struct message_queue *queue;
 	struct queue_group *queue_group;
-	/* struct queue_group_entry *queue_group_entry; */
-	SaMsgQueueGroupNotificationT *notification;
+	SaMsgQueueGroupNotificationT *notification = NULL;
 	SaAisErrorT error = SA_AIS_OK;
 	SaAisErrorT error_cb = SA_AIS_OK;
 
@@ -1201,20 +1226,19 @@
 	}
 
 	/*
-	queue_group_entry = malloc (sizeof (struct queue_group_entry));
-	if (queue_group_entry == 0) {
-		error = SA_AIS_ERR_NO_MEMORY;
-		goto error_exit;
+	 * If the policy is SA_MSG_QUEUE_GROUP_ROUND_ROBIN and the
+	 * rr_queue is NULL, then this is the first queue in the group.
+	 */
+	if ((queue_group->policy == SA_MSG_QUEUE_GROUP_ROUND_ROBIN) &&
+	    (queue_group->rr_queue == NULL))
+	{
+		queue_group->rr_queue = queue;
 	}
-	*/
 
 	list_init (&queue->group_list);
 	list_add (&queue->group_list, &queue_group->queue_head);
 	list_add (&queue->queue_list, &queue_list_head);
 
-	/* queue_group_entry->message_queue = queue; */
-	/* queue_group_entry->change = SA_MSG_QUEUE_GROUP_ADDED; */
-
 	queue->change = SA_MSG_QUEUE_GROUP_ADDED;
 
 	if (queue_group->track_flags & SA_TRACK_CHANGES) {
@@ -1256,11 +1280,11 @@
 	}
 
 error_track:
-	/* queue_group_entry->change = SA_MSG_QUEUE_GROUP_NO_CHANGE; */
 
 	queue->change = SA_MSG_QUEUE_GROUP_NO_CHANGE;
 
 error_exit:
+
 	if (api->ipc_source_is_local(&req_exec_msg_queuegroupinsert->source)) {
 		res_lib_msg_queuegroupinsert.header.size =
 			sizeof (struct res_lib_msg_queuegroupinsert);
@@ -1317,8 +1341,7 @@
 	struct res_lib_msg_queuegrouptrack res_lib_msg_queuegrouptrack;
 	struct queue_group *queue_group;
 	struct message_queue *queue;
-	/* struct queue_group_entry *queue_group_entry; */
-	SaMsgQueueGroupNotificationT *notification;
+	SaMsgQueueGroupNotificationT *notification = NULL;
 	SaAisErrorT error = SA_AIS_OK;
 	SaAisErrorT error_cb = SA_AIS_OK;
 
@@ -1342,15 +1365,19 @@
 	}
 
 	/*
-	queue_group_entry = queue_group_entry_find (queue_group, queue);
-	if (queue_group_entry == 0) {
+	 * FIXME: Need to verify that queue being removed
+	 * is actually a member of the queue group.
+	 */
+
+	/*
+	queue = group_queue_find (queue_group,
+				  &req_exec_msg_queuegroupremove->queue_name);
+	if (queue == 0) {
 		error = SA_AIS_ERR_NOT_EXIST;
 		goto error_exit;
 	}
 	*/
 
-	/* queue_group_entry->change = SA_MSG_QUEUE_GROUP_REMOVED; */
-
 	queue->change = SA_MSG_QUEUE_GROUP_REMOVED;
 
 	if (queue_group->track_flags & SA_TRACK_CHANGES) {
@@ -1392,13 +1419,21 @@
 	}
 
 error_track:
-	/* queue_group_entry->change = SA_MSG_QUEUE_GROUP_NO_CHANGE; */
 
 	queue->change = SA_MSG_QUEUE_GROUP_NO_CHANGE;
 
+	/*
+	 * If the queue we are removing is also the next rr_queue,
+	 * we should set rr_queue to the next queue on the list.
+	 */
+	if (queue_group->rr_queue == queue) {
+		queue_group->rr_queue = next_rr_queue (queue_group);
+	}
+
 	list_del (&queue->group_list);
 
 error_exit:
+
 	if (api->ipc_source_is_local(&req_exec_msg_queuegroupremove->source)) {
 		res_lib_msg_queuegroupremove.header.size =
 			sizeof (struct res_lib_msg_queuegroupremove);
@@ -1494,7 +1529,7 @@
 	unsigned int change_count = 0;
 	unsigned int member_count = 0;
 
-	SaMsgQueueGroupNotificationT *notification;
+	SaMsgQueueGroupNotificationT *notification = NULL;
 
 	log_printf (LOG_LEVEL_NOTICE, "EXEC request: saMsgQueueGroupTrack %s\n",
 		getSaNameT (&req_exec_msg_queuegrouptrack->queue_group_name));
@@ -1641,8 +1676,9 @@
 		(struct req_exec_msg_messagesend *)message;
 	struct res_lib_msg_messagesend res_lib_msg_messagesend;
 	struct res_lib_msg_messagesendasync res_lib_msg_messagesendasync;
-	struct message_queue *queue;
-	struct message_entry *entry;
+	struct queue_group *group;
+	struct message_queue *queue = NULL;
+	struct message_entry *entry = NULL;
 	SaAisErrorT error = SA_AIS_OK;
 
 	char *data = ((char *)(req_exec_msg_messagesend) +
@@ -1651,10 +1687,13 @@
 	log_printf (LOG_LEVEL_NOTICE, "EXEC request: saMsgMessageSend %s\n",
 		getSaNameT (&req_exec_msg_messagesend->destination));
 
-	queue = queue_find (&req_exec_msg_messagesend->destination);
-	if (queue == NULL) {
-		error = SA_AIS_ERR_NOT_EXIST;
-		goto error_exit;
+	group = group_find (&req_exec_msg_messagesend->destination);
+	if (group == NULL) {
+		queue = queue_find (&req_exec_msg_messagesend->destination);
+		if (queue == NULL) {
+			error = SA_AIS_ERR_NOT_EXIST;
+			goto error_exit;
+		}
 	}
 
 	entry = malloc (sizeof (struct message_entry));
@@ -1678,7 +1717,13 @@
 
 	entry->time = clust_time_now();
 
-	list_add_tail (&entry->list, &queue->message_head);
+	if (group != NULL) {
+		/* if (group->policy == SA_MSG_QUEUE_GROUP_ROUND_ROBIN) */
+		list_add_tail (&entry->list, &group->rr_queue->message_head);
+		group->rr_queue = next_rr_queue (group);
+	} else {
+		list_add_tail (&entry->list, &queue->message_head);
+	}
 
 error_exit:
 
@@ -1732,7 +1777,7 @@
 		(struct req_exec_msg_messageget *)message;
 	struct res_lib_msg_messageget res_lib_msg_messageget;
 	struct message_queue *queue;
-	struct message_entry *entry;
+	struct message_entry *entry = NULL;
 	SaAisErrorT error = SA_AIS_OK;
 
 	log_printf (LOG_LEVEL_NOTICE, "EXEC request: saMsgMessageGet %s\n",
@@ -1766,8 +1811,14 @@
 			MESSAGE_RES_MSG_MESSAGEGET;
 		res_lib_msg_messageget.header.error = error;
 
-		memcpy (&res_lib_msg_messageget.message, &entry->message,
-			sizeof (SaMsgMessageT));
+		if (error == SA_AIS_OK) {
+			memcpy (&res_lib_msg_messageget.message, &entry->message,
+				sizeof (SaMsgMessageT));
+		} else {
+			memset (&res_lib_msg_messageget.message, 0,
+				sizeof (SaMsgMessageT));
+		}
+
 		memcpy (&res_lib_msg_messageget.source,
 			&req_exec_msg_messageget->source,
 			sizeof (mar_message_source_t));
Index: lib/msg.c
===================================================================
--- lib/msg.c	(revision 1634)
+++ lib/msg.c	(working copy)
@@ -54,7 +54,6 @@
 #include "../include/ipc_msg.h"
 #include "util.h"
 
-
 struct message_overlay {
 	mar_res_header_t header __attribute__((aligned(8)));
 	char data[4096];
@@ -942,6 +941,10 @@
 		return (SA_AIS_ERR_INVALID_PARAM);
 	}
 
+	if (queueGroupPolicy != SA_MSG_QUEUE_GROUP_ROUND_ROBIN) {
+		return (SA_AIS_ERR_NOT_SUPPORTED);
+	}
+
 	/* DEBUG */
 	printf ("[DEBUG]: saMsgQueueGroupCreate { queueGroupName = %s }\n",
 		(char *) queueGroupName->value);
@@ -1232,7 +1235,7 @@
 
 error_exit:
 	pthread_mutex_unlock (&msgInstance->response_mutex);
-error_put_msg:
+/* error_put_msg: */
 	saHandleInstancePut (&msgHandleDatabase, msgHandle);
 
 	return (error == SA_AIS_OK ? res_lib_msg_queuegrouptrack.header.error : error);
@@ -1370,7 +1373,7 @@
 
 error_exit:
 	pthread_mutex_unlock (&msgInstance->response_mutex);
-error_put_msg:
+/* error_put_msg: */
 	saHandleInstancePut (&msgHandleDatabase, msgHandle);
 
 	return (error == SA_AIS_OK ? res_lib_msg_messagesend.header.error : error);
@@ -1441,7 +1444,7 @@
 
 error_exit:
 	pthread_mutex_unlock (&msgInstance->response_mutex);
-error_put_msg:
+/* error_put_msg: */
 	saHandleInstancePut (&msgHandleDatabase, msgHandle);
 
 	return (error == SA_AIS_OK ? res_lib_msg_messagesendasync.header.error : error);
@@ -1521,7 +1524,7 @@
 
 error_exit:
 	pthread_mutex_unlock (msgQueueInstance->response_mutex);
-error_put_msg:
+/* error_put_msg: */
 	saHandleInstancePut (&queueHandleDatabase, queueHandle);
 
 	if (error == SA_AIS_OK)


More information about the Openais mailing list