[Openais] [PATCH] Synchronization for Message Service

Ryan O'Hara rohara at redhat.com
Fri Aug 22 15:54:27 PDT 2008


Attached is a patch to add synchronization capabilities to the message
service. This code should sync queue groups, message queues, and
messages.

A brief word about how it is designed: The first pass of the
synchronization code will iterate over the queue_list_head,
synchronizing all message queues and messages within those queues. In
the second pass, the code will handle queue groups. The queues that
exist within those groups should have already been sync'd, so we
simply have to add the group and add the correct queues to the group.

Also note that there is an important fix included in this patch. There
was a problem with saMsgQueueGroupInsert where is would incorrectly
add the queue to be inserted into the queue_list_head.

Ryan

-------------- next part --------------
Index: test/Makefile
===================================================================
--- test/Makefile	(revision 1638)
+++ test/Makefile	(working copy)
@@ -39,7 +39,7 @@
 
 LIBRARIES= ../lib/libSaCkpt.a ../lib/libSaMsg.a ../lib/libSaEvt.a sa_error.o
 LIBS = $(LIBRARIES) 
-BINARIES= testckpt testmsg testmsg2 testevt
+BINARIES= testckpt testmsg testmsg2 testmsg3 testevt
 
 override CFLAGS += -I../include
 override LDFLAGS += -L../lib
@@ -58,6 +58,9 @@
 testmsg2: testmsg2.o $(LIBRARIES)
 	$(CC) $(LDFLAGS) -o testmsg2 testmsg2.o $(LIBS)
 
+testmsg3: testmsg3.o $(LIBRARIES)
+	$(CC) $(LDFLAGS) -o testmsg3 testmsg3.o $(LIBS)
+
 testevt: testevt.o $(LIBRARIES)
 	$(CC) $(LDFLAGS) -o testevt testevt.o $(LIBS)
 clean:
Index: services/msg.c
===================================================================
--- services/msg.c	(revision 1638)
+++ services/msg.c	(working copy)
@@ -76,7 +76,11 @@
 	MESSAGE_REQ_EXEC_MSG_MESSAGEGET = 11,
 	MESSAGE_REQ_EXEC_MSG_MESSAGECANCEL = 12,
 	MESSAGE_REQ_EXEC_MSG_MESSAGESENDRECEIVE = 13,
-	MESSAGE_REQ_EXEC_MSG_MESSAGEREPLY = 14
+	MESSAGE_REQ_EXEC_MSG_MESSAGEREPLY = 14,
+	MESSAGE_REQ_EXEC_MSG_SYNC_QUEUE = 15,
+	MESSAGE_REQ_EXEC_MSG_SYNC_QUEUE_ENTRY = 16,
+	MESSAGE_REQ_EXEC_MSG_SYNC_GROUP = 17,
+	MESSAGE_REQ_EXEC_MSG_SYNC_GROUP_ENTRY = 18,
 };
 
 struct message_entry {
@@ -103,6 +107,16 @@
 	struct list_head queue_head;
 };
 
+enum sync_state {
+	SYNC_STATE_GROUP,
+	SYNC_STATE_QUEUE,
+};
+
+enum iteration_state {
+	ITERATION_STATE_GROUP,
+	ITERATION_STATE_QUEUE,
+};
+
 /*
 struct queue_group_entry {
 	SaMsgQueueGroupChangesT change;
@@ -123,6 +137,9 @@
 DECLARE_LIST_INIT(queue_list_head);
 DECLARE_LIST_INIT(group_list_head);
 
+DECLARE_LIST_INIT(sync_queue_list_head);
+DECLARE_LIST_INIT(sync_group_list_head);
+
 static struct corosync_api_v1 *api;
 
 static int msg_exec_init_fn (struct corosync_api_v1 *);
@@ -131,6 +148,26 @@
 
 static int msg_lib_init_fn (void *conn);
 
+static enum sync_state my_sync_state;
+
+static enum iteration_state my_iteration_state;
+
+static struct list_head *my_iteration_state_group;
+
+static struct list_head *my_iteration_state_queue;
+
+static struct list_head *my_iteration_state_entry;
+
+static unsigned int my_member_list[PROCESSOR_COUNT_MAX];
+
+static unsigned int my_member_list_entries = 0;
+
+static unsigned int my_lowest_nodeid = 0;
+
+/*
+static int msg_dump_fn (void);
+*/
+
 static void message_handler_req_exec_msg_queueopen (
 	void *message,
 	unsigned int nodeid);
@@ -191,6 +228,22 @@
 	void *message,
 	unsigned int nodeid);
 
+static void message_handler_req_exec_msg_sync_queue (
+	void *message,
+	unsigned int nodeid);
+
+static void message_handler_req_exec_msg_sync_queue_entry (
+	void *message,
+	unsigned int nodeid);
+
+static void message_handler_req_exec_msg_sync_group (
+	void *message,
+	unsigned int nodeid);
+
+static void message_handler_req_exec_msg_sync_group_entry (
+	void *message,
+	unsigned int nodeid);
+
 static void message_handler_req_lib_msg_queueopen (
 	void *conn,
 	void *msg);
@@ -263,16 +316,17 @@
 	void *conn,
 	void *msg);
 
-#ifdef TODO
 static void msg_sync_init (void);
-#endif	/* TODO */
-
 static void msg_sync_activate (void);
 static int  msg_sync_process (void);
 static void msg_sync_abort(void);
 
+void queue_entry_release (struct message_entry *entry);
 void queue_release (struct message_queue *queue);
+void grouo_release (struct queue_group *group);
 
+static struct memb_ring_id my_saved_ring_id;
+
 static void msg_confchg_fn (
 	enum totem_configuration_type configuration_type,
 	unsigned int *member_list, int member_list_entries,
@@ -445,8 +499,20 @@
 		.exec_handler_fn		= message_handler_req_exec_msg_messagesendreceive,
 	},
 	{
-		.exec_handler_fn		= message_handler_req_exec_msg_messagereply
-	}
+		.exec_handler_fn		= message_handler_req_exec_msg_messagereply,
+	},
+	{
+		.exec_handler_fn		= message_handler_req_exec_msg_sync_queue,
+	},
+	{
+		.exec_handler_fn		= message_handler_req_exec_msg_sync_queue_entry,
+	},
+	{
+		.exec_handler_fn		= message_handler_req_exec_msg_sync_group,
+	},
+	{
+		.exec_handler_fn		= message_handler_req_exec_msg_sync_group_entry,
+	},
 };
 
 struct corosync_service_engine msg_service_engine = {
@@ -459,14 +525,14 @@
 	.lib_engine			= msg_lib_engine,
 	.lib_engine_count		= sizeof (msg_lib_engine) / sizeof (struct corosync_lib_handler),
 	.exec_init_fn			= msg_exec_init_fn,
+	.exec_dump_fn			= NULL,
 	.exec_engine			= msg_exec_engine,
 	.exec_engine_count		= sizeof (msg_exec_engine) / sizeof (struct corosync_exec_handler),
 	.confchg_fn			= msg_confchg_fn,
-	.exec_dump_fn			= NULL,
-	.sync_init			= NULL, // TODO msg_sync_init,
+	.sync_init			= msg_sync_init,
 	.sync_process			= msg_sync_process,
 	.sync_activate			= msg_sync_activate,
-	.sync_abort			= msg_sync_abort
+	.sync_abort			= msg_sync_abort,
 };
 
 static struct corosync_service_engine *msg_get_engine_ver0 (void);
@@ -615,6 +681,38 @@
 	int async_call;
 };
 
+struct req_exec_msg_sync_queue {
+	mar_req_header_t header;
+	struct memb_ring_id ring_id;
+	SaNameT group_name;
+	SaNameT queue_name;
+	SaMsgQueueGroupChangesT change;
+};
+
+struct req_exec_msg_sync_queue_entry {
+	mar_req_header_t header;
+	struct memb_ring_id ring_id;
+	SaNameT queue_name;
+	SaTimeT time;
+	SaMsgMessageT message;
+};
+
+struct req_exec_msg_sync_group {
+	mar_req_header_t header;
+	struct memb_ring_id ring_id;
+	SaNameT group_name;
+	SaUint8T track_flags;
+	SaMsgQueueGroupPolicyT policy;
+	SaNameT rr_queue_name;
+};
+
+struct req_exec_msg_sync_group_entry {
+	mar_req_header_t header;
+	struct memb_ring_id ring_id;
+	SaNameT group_name;
+	SaNameT queue_name;
+};
+
 char *getSaNameT (SaNameT *name)
 {
 #if 0
@@ -641,7 +739,7 @@
 	return 0;
 }
 
-SaTimeT clust_time_now(void)
+SaTimeT cluster_time_now(void)
 {
 	struct timeval tv;
 	SaTimeT time_now;
@@ -656,83 +754,111 @@
 	return time_now;
 }
 
-#ifdef TODO
-static void msg_sync_init (void) 
+#if 0
+static void print_entry_list (struct message_queue *queue)
 {
-	return;
-}
-#endif	/* TODO */
+	struct list_head *list;
+	struct message_entry *entry;
 
-static int msg_sync_process (void) 
-{
-	return (0);
-}
+	for (list = queue->message_head.next;
+	     list != &queue->message_head;
+	     list = list->next)
+	{
+		entry = list_entry (list, struct message_entry, list);
 
-static void msg_sync_activate (void) 
-{		
- 	return;
+		log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: print_entry_list ( %s )\n",
+			    (char *)(entry->message.data));
+	}
 }
+#endif
 
-static void msg_sync_abort (void) 
-{
-	return;
-}
-
-static void msg_confchg_fn (
-	enum totem_configuration_type configuration_type,
-	unsigned int *member_list, int member_list_entries,
-	unsigned int *left_list, int left_list_entries,
-	unsigned int *joined_list, int joined_list_entries,
-	struct memb_ring_id *ring_id) 
-{
-	return;
-}
-
 #if 0
-static void print_message_list (struct message_queue *queue)
+static void print_queue_list (struct list_head *head)
 {
 	struct list_head *list;
-	struct message_entry *message;
+	struct message_queue *queue;
 
-	for (list = queue->message_head.next;
-	     list != &queue->message_head;
+	for (list = head->next;
+	     list != head;
 	     list = list->next)
 	{
-		message = list_entry (list, struct message_entry, list);
+		queue = list_entry (list, struct message_queue, queue_list);
 
-		log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: print_message_list (%s) (%llu)\n",
-			    (char *)(message->message.data),
-			    (unsigned long long)(message->time));
+		log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: print_queue_list ( %s )\n",
+			    getSaNameT (&queue->name));
 	}
 }
 #endif
 
 #if 0
-static void print_queue_group_list (struct queue_group *group)
+static void print_group_list (struct list_head *head)
 {
 	struct list_head *list;
-	struct message_queue *queue;
+	struct queue_group *group;
 
-	for (list = group->queue_head.next;
-	     list != &group->queue_head;
+	for (list = head->next;
+	     list != head;
 	     list = list->next)
 	{
-		queue = list_entry (list, struct message_queue, group_list);
+		group = list_entry (list, struct queue_group, group_list);
 
-		log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: print_queue_group_list (%s) (%u)\n",
-			    (char *)(queue->name.value),
-			    (unsigned int)(queue->change));
+		log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: print_group_list ( %s )\n",
+			    getSaNameT (&group->name));
 	}
 }
 #endif
 
-static struct message_queue *queue_find (SaNameT *name)
+void queue_entry_release (struct message_entry *entry)
 {
+	/* DEBUG */
+	log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: queue_entry_release { %s }\n",
+		(char *)(entry->message.data));
+
+	if (entry->message.data) {
+		free (entry->message.data);
+	}
+	free (entry);
+}
+
+void queue_release (struct message_queue *queue)
+{
 	struct list_head *list;
+	struct message_entry *entry;
+
+	/* DEBUG */
+	log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: queue_release { %s }\n",
+		getSaNameT (&queue->name));
+
+	for (list = queue->message_head.next;
+	     list != &queue->message_head;) {
+
+		entry = list_entry (list, struct message_entry, list);
+
+		list = list->next;
+		queue_entry_release (entry);
+	}
+	list_del (&queue->queue_list);
+	list_del (&queue->group_list);
+	free (queue);
+}
+
+void group_release (struct queue_group *group)
+{
+	/* DEBUG */
+	log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: group_release { %s }\n",
+		getSaNameT (&group->name));
+
+	list_del (&group->group_list);
+	free (group);
+}
+
+static struct message_queue *sync_queue_find (SaNameT *name)
+{
+	struct list_head *list;
 	struct message_queue *queue;
 
-	for (list = queue_list_head.next;
-	     list != &queue_list_head;
+	for (list = sync_queue_list_head.next;
+	     list != &sync_queue_list_head;
 	     list = list->next)
 	{
 	        queue = list_entry (list, struct message_queue, queue_list);
@@ -744,13 +870,13 @@
 	return (0);
 }
 
-static struct queue_group *group_find (SaNameT *name)
+static struct queue_group *sync_group_find (SaNameT *name)
 {
 	struct list_head *list;
 	struct queue_group *group;
 
-	for (list = group_list_head.next;
-	     list != &group_list_head;
+	for (list = sync_group_list_head.next;
+	     list != &sync_group_list_head;
 	     list = list->next)
 	{
 	        group = list_entry (list, struct queue_group, group_list);
@@ -762,17 +888,361 @@
 	return (0);
 }
 
-#if 0
-static struct message_queue *group_queue_find (struct queue_group *group, SaNameT *name)
+static inline void sync_group_free (struct list_head *group_head)
 {
+	struct queue_group *group;
 	struct list_head *list;
+
+	list = group_head->next;
+
+	while (list != group_head) {
+		group = list_entry (list, struct queue_group, group_list);
+		list = list->next;
+		group_release (group);
+	}
+
+	list_init (group_head);
+}
+
+static inline void sync_queue_free (struct list_head *queue_head)
+{
 	struct message_queue *queue;
+	struct list_head *list;
 
-	for (list = group->queue_head.next;
-	     list != &group->queue_head;
+	list = queue_head->next;
+
+	while (list != queue_head) {
+		queue = list_entry (list, struct message_queue, queue_list);
+		list = list->next;
+		queue_release (queue);
+	}
+
+	list_init (queue_head);
+}
+
+static inline void sync_queue_enter (void)
+{
+	struct message_queue *queue;
+
+	ENTER();
+
+	my_sync_state = SYNC_STATE_QUEUE;
+	my_iteration_state = ITERATION_STATE_QUEUE;
+	my_iteration_state_queue = queue_list_head.next;
+
+	queue = list_entry (queue_list_head.next, struct message_queue, queue_list);
+
+	my_iteration_state_entry = queue->message_head.next;
+
+	LEAVE();
+}
+
+static inline void sync_group_enter (void)
+{
+	my_sync_state = SYNC_STATE_GROUP;
+}
+
+static int sync_queue_transmit (
+	struct message_queue *queue)
+{
+	struct req_exec_msg_sync_queue req_exec_msg_sync_queue;
+	struct iovec iovec;
+
+	req_exec_msg_sync_queue.header.size =
+		sizeof (struct req_exec_msg_sync_queue);
+	req_exec_msg_sync_queue.header.id =
+		SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_SYNC_QUEUE);
+
+	memcpy (&req_exec_msg_sync_queue.ring_id,
+		&my_saved_ring_id, sizeof (struct memb_ring_id));
+	memcpy (&req_exec_msg_sync_queue.queue_name,
+		&queue->name, sizeof (SaNameT));
+
+	req_exec_msg_sync_queue.change = queue->change;
+
+	iovec.iov_base = (char *)&req_exec_msg_sync_queue;
+	iovec.iov_len = sizeof (req_exec_msg_sync_queue);
+
+	return (api->totem_mcast (&iovec, 1, TOTEM_AGREED));
+}
+
+static int sync_queue_entry_transmit (
+	struct message_queue *queue,
+	struct message_entry *entry)
+{
+	struct req_exec_msg_sync_queue_entry req_exec_msg_sync_queue_entry;
+	struct iovec iovec[2];
+
+	req_exec_msg_sync_queue_entry.header.size =
+		sizeof (struct req_exec_msg_sync_queue_entry);
+	req_exec_msg_sync_queue_entry.header.id =
+		SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_SYNC_QUEUE_ENTRY);
+
+	memcpy (&req_exec_msg_sync_queue_entry.ring_id,
+		&my_saved_ring_id, sizeof (struct memb_ring_id));
+	memcpy (&req_exec_msg_sync_queue_entry.queue_name,
+		&queue->name, sizeof (SaNameT));
+	memcpy (&req_exec_msg_sync_queue_entry.message,
+		&entry->message, sizeof (SaMsgMessageT));
+
+	req_exec_msg_sync_queue_entry.time = entry->time;
+
+	iovec[0].iov_base = (char *)&req_exec_msg_sync_queue_entry;
+	iovec[0].iov_len = sizeof (req_exec_msg_sync_queue_entry);
+	iovec[1].iov_base = entry->message.data;
+	iovec[1].iov_len = entry->message.size;
+
+	return (api->totem_mcast (iovec, 2, TOTEM_AGREED));
+}
+
+static int sync_group_transmit (
+	struct queue_group *group)
+{
+	struct req_exec_msg_sync_group req_exec_msg_sync_group;
+	struct iovec iovec;
+
+	req_exec_msg_sync_group.header.size =
+		sizeof (req_exec_msg_sync_group);
+	req_exec_msg_sync_group.header.id =
+		SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_SYNC_GROUP);
+
+	memcpy (&req_exec_msg_sync_group.ring_id,
+		&my_saved_ring_id, sizeof (struct memb_ring_id));
+	memcpy (&req_exec_msg_sync_group.group_name,
+		&group->name, sizeof (SaNameT));
+
+	req_exec_msg_sync_group.policy = group->policy;
+	req_exec_msg_sync_group.track_flags = group->track_flags;
+
+	iovec.iov_base = (char *)&req_exec_msg_sync_group;
+	iovec.iov_len = sizeof (req_exec_msg_sync_group);
+
+	return (api->totem_mcast (&iovec, 1, TOTEM_AGREED));
+}
+
+static int sync_group_entry_transmit (
+	struct queue_group *group,
+	struct message_queue *queue)
+{
+	struct req_exec_msg_sync_group_entry req_exec_msg_sync_group_entry;
+	struct iovec iovec;
+
+	req_exec_msg_sync_group_entry.header.size =
+		sizeof (struct req_exec_msg_sync_group_entry);
+	req_exec_msg_sync_group_entry.header.id =
+		SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_SYNC_GROUP_ENTRY);
+
+	memcpy (&req_exec_msg_sync_group_entry.ring_id,
+		&my_saved_ring_id, sizeof (struct memb_ring_id));
+	memcpy (&req_exec_msg_sync_group_entry.group_name,
+		&group->name, sizeof (SaNameT));
+	memcpy (&req_exec_msg_sync_group_entry.queue_name,
+		&queue->name, sizeof (SaNameT));
+
+	iovec.iov_base = (char *)&req_exec_msg_sync_group_entry;
+	iovec.iov_len = sizeof (req_exec_msg_sync_group_entry);
+
+	return (api->totem_mcast (&iovec, 1, TOTEM_AGREED));
+}
+
+static int sync_queue_iterate (void)
+{
+	struct message_queue *queue;
+	struct message_entry *entry;
+	struct list_head *queue_list;
+	struct list_head *entry_list;
+	unsigned int res = 0;
+
+	for (queue_list = queue_list_head.next;
+	     queue_list != &queue_list_head;
+	     queue_list = queue_list->next)
+	{
+		queue = list_entry (queue_list, struct message_queue, queue_list);
+
+		res = sync_queue_transmit (queue);
+		if (res != 0) {
+			break;
+		}
+
+		for (entry_list = queue->message_head.next;
+		     entry_list != &queue->message_head;
+		     entry_list = entry_list->next)
+		{
+			entry = list_entry (entry_list, struct message_entry, list);
+			res = sync_queue_entry_transmit (queue, entry);
+		}
+	}
+
+	return (res);
+}
+
+static int sync_group_iterate (void)
+{
+	struct queue_group *group;
+	struct message_queue *queue;
+	struct list_head *group_list;
+	struct list_head *queue_list;
+	unsigned int res = 0;
+
+	for (group_list = group_list_head.next;
+	     group_list != &group_list_head;
+	     group_list = group_list->next)
+	{
+		group = list_entry (group_list, struct queue_group, group_list);
+
+		res = sync_group_transmit (group);
+		if (res != 0) {
+			break;
+		}
+
+		for (queue_list = group->queue_head.next;
+		     queue_list != &group->queue_head;
+		     queue_list = queue_list->next)
+		{
+			queue = list_entry (queue_list, struct message_queue, group_list);
+			res = sync_group_entry_transmit (group, queue);
+		}
+	}
+
+	return (res);
+}
+
+static void msg_sync_init (void) 
+{
+	ENTER();
+
+	sync_queue_enter ();
+
+	LEAVE();
+}
+
+static int msg_sync_process (void)
+{
+	unsigned int done_queueing = 1;
+	unsigned int continue_processing = 0;
+	unsigned int res;
+
+	switch (my_sync_state)
+	{
+	case SYNC_STATE_QUEUE:
+		if (my_lowest_nodeid == api->totem_nodeid_get ()) {
+			TRACE1 ("transmit queues because lowest member in old configuration.\n");
+
+			res = sync_queue_iterate ();
+			if (res == 0) {
+				done_queueing = 1;
+			}
+		}
+		if (done_queueing) {
+			sync_group_enter ();
+		}
+
+		continue_processing = 1;
+		break;
+
+	case SYNC_STATE_GROUP:
+		if (my_lowest_nodeid == api->totem_nodeid_get ()) {
+			TRACE1 ("transmit groups because lowest member in old configuration.\n");
+
+			res = sync_group_iterate ();
+		}
+		if (done_queueing) {
+			continue_processing = 0;
+		}
+		break;
+	}
+
+	return (continue_processing);
+}
+
+static void msg_sync_activate (void) 
+{
+	sync_group_free (&group_list_head);
+	sync_queue_free (&queue_list_head);
+
+	if (!list_empty (&sync_group_list_head)) {
+		list_splice (&sync_group_list_head, &group_list_head);
+	}
+
+	if (!list_empty (&sync_queue_list_head)) {
+		list_splice (&sync_queue_list_head, &queue_list_head);
+	}
+
+	list_init (&sync_group_list_head);
+	list_init (&sync_queue_list_head);
+
+	/* DEBUG
+	log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: group list after activate...\n");
+	print_group_list (&group_list_head);
+	*/
+
+	/* DEBUG
+	log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: queue list after activate...\n");
+	print_queue_list (&queue_list_head);
+	*/
+
+ 	return;
+}
+
+static void msg_sync_abort (void) 
+{
+	sync_group_free (&sync_group_list_head);
+	sync_queue_free (&sync_queue_list_head);
+}
+
+static void msg_confchg_fn (
+	enum totem_configuration_type configuration_type,
+	unsigned int *member_list, int member_list_entries,
+	unsigned int *left_list, int left_list_entries,
+	unsigned int *joined_list, int joined_list_entries,
+	struct memb_ring_id *ring_id) 
+{
+	unsigned int i, j;
+
+	/*
+	 * Determine lowest nodeid in old regular configuration for the
+	 * purpose of executing the synchronization algorithm
+	 */
+	if (configuration_type == TOTEM_CONFIGURATION_TRANSITIONAL) {
+		for (i = 0; i < left_list_entries; i++) {
+			for (j = 0; j < my_member_list_entries; j++) {
+				if (left_list[i] == my_member_list[j]) {
+					my_member_list[j] = 0;
+				}
+			}
+		}
+	}
+
+	my_lowest_nodeid = 0xffffffff;
+
+	/*
+	 * Handle regular configuration
+	 */
+	if (configuration_type == TOTEM_CONFIGURATION_REGULAR) {
+		memcpy (my_member_list, member_list,
+			sizeof (unsigned int) * member_list_entries);
+		my_member_list_entries = member_list_entries;
+		memcpy (&my_saved_ring_id, ring_id,
+			sizeof (struct memb_ring_id));
+		for (i = 0; i < my_member_list_entries; i++) {
+			if ((my_member_list[i] != 0) &&
+			    (my_member_list[i] < my_lowest_nodeid)) {
+				my_lowest_nodeid = my_member_list[i];
+			}
+		}
+	}
+}
+
+static struct message_queue *queue_find (SaNameT *name)
+{
+	struct list_head *list;
+	struct message_queue *queue;
+
+	for (list = queue_list_head.next;
+	     list != &queue_list_head;
 	     list = list->next)
 	{
-	        queue = list_entry (list, struct message_queue, group_list);
+	        queue = list_entry (list, struct message_queue, queue_list);
 
 		if (name_match (name, &queue->name)) {
 			return (queue);
@@ -780,8 +1250,25 @@
 	}
 	return (0);
 }
-#endif
 
+static struct queue_group *group_find (SaNameT *name)
+{
+	struct list_head *list;
+	struct queue_group *group;
+
+	for (list = group_list_head.next;
+	     list != &group_list_head;
+	     list = list->next)
+	{
+	        group = list_entry (list, struct queue_group, group_list);
+
+		if (name_match (name, &group->name)) {
+			return (group);
+		}
+	}
+	return (0);
+}
+
 static unsigned int queue_group_member_count (struct queue_group *group)
 {
 	struct list_head *list;
@@ -1237,7 +1724,6 @@
 
 	list_init (&queue->group_list);
 	list_add (&queue->group_list, &queue_group->queue_head);
-	list_add (&queue->queue_list, &queue_list_head);
 
 	queue->change = SA_MSG_QUEUE_GROUP_ADDED;
 
@@ -1715,7 +2201,7 @@
 	memset (entry->message.data, 0, entry->message.size);
 	memcpy (entry->message.data, (void *)(data), entry->message.size);
 
-	entry->time = clust_time_now();
+	entry->time = cluster_time_now();
 
 	if (group != NULL) {
 		/* if (group->policy == SA_MSG_QUEUE_GROUP_ROUND_ROBIN) */
@@ -1868,6 +2354,199 @@
 #endif
 }
 
+static void message_handler_req_exec_msg_sync_queue (
+	void *message,
+	unsigned int nodeid)
+{
+	struct req_exec_msg_sync_queue *req_exec_msg_sync_queue =
+		(struct req_exec_msg_sync_queue *)message;
+	struct message_queue *queue = NULL;
+
+	log_printf (LOG_LEVEL_NOTICE, "EXEC request: sync queue %s\n",
+		getSaNameT (&req_exec_msg_sync_queue->queue_name));
+
+	/*
+	 * Ignore messages from previous ring
+	 */
+	if (memcmp (&req_exec_msg_sync_queue->ring_id,
+		    &my_saved_ring_id, sizeof (struct memb_ring_id)) != 0)
+	{
+		return;
+	}
+
+	queue = sync_queue_find (&req_exec_msg_sync_queue->queue_name);
+
+	if (queue == NULL)
+	{
+		queue = malloc (sizeof (struct message_queue));
+		if (queue == NULL) {
+#ifdef TODO
+			openais_exit_error (AIS_DONE_OUT_OF_MEMORY);
+#endif
+		}
+		memset (queue, 0, sizeof (struct message_queue));
+		memcpy (&queue->name,
+			&req_exec_msg_sync_queue->queue_name,
+			sizeof (SaNameT));
+
+		queue->change = req_exec_msg_sync_queue->change;
+
+		list_init (&queue->group_list);
+		list_init (&queue->queue_list);
+		list_init (&queue->message_head);
+		list_add (&queue->queue_list, &sync_queue_list_head);
+
+		/* DEBUG */
+		/* print_queue_list (&sync_queue_list_head); */
+	}
+}
+
+static void message_handler_req_exec_msg_sync_queue_entry (
+	void *message,
+	unsigned int nodeid)
+{
+	struct req_exec_msg_sync_queue_entry *req_exec_msg_sync_queue_entry =
+		(struct req_exec_msg_sync_queue_entry *)message;
+	struct message_queue *queue;
+	struct message_entry *entry;
+
+	char *data = ((char *)(req_exec_msg_sync_queue_entry) +
+		      sizeof (struct req_exec_msg_sync_queue_entry));
+
+	log_printf (LOG_LEVEL_NOTICE, "EXEC request: sync queue entry %s\n",
+		getSaNameT (&req_exec_msg_sync_queue_entry->queue_name));
+
+	/*
+	 * Ignore messages from previous ring
+	 */
+	if (memcmp (&req_exec_msg_sync_queue_entry->ring_id,
+		    &my_saved_ring_id, sizeof (struct memb_ring_id)) != 0)
+	{
+		return;
+	}
+
+	queue = sync_queue_find (&req_exec_msg_sync_queue_entry->queue_name);
+
+	assert (queue != NULL);
+
+	entry = malloc (sizeof (struct message_entry));
+	if (entry == NULL) {
+#ifdef TODO
+		openais_exit_error (AIS_DONE_OUT_OF_MEMORY);		
+#endif
+	}
+
+	memset (entry, 0, sizeof (struct message_entry));
+	memcpy (&entry->message, &req_exec_msg_sync_queue_entry->message,
+		sizeof (SaMsgMessageT));
+
+	entry->message.data = malloc (entry->message.size);
+	if (entry->message.data == NULL) {
+#ifdef TODO
+		openais_exit_error (AIS_DONE_OUT_OF_MEMORY);		
+#endif
+	}
+
+	memset (entry->message.data, 0, entry->message.size);
+	memcpy (entry->message.data, (void *)(data), entry->message.size);
+
+	entry->time = req_exec_msg_sync_queue_entry->time;
+
+	list_add_tail (&entry->list, &queue->message_head);
+
+	/* DEBUG */
+	/* print_entry_list (queue); */
+}
+
+static void message_handler_req_exec_msg_sync_group (
+	void *message,
+	unsigned int nodeid)
+{
+	struct req_exec_msg_sync_group *req_exec_msg_sync_group =
+		(struct req_exec_msg_sync_group *)message;
+	struct queue_group *group = NULL;
+
+	log_printf (LOG_LEVEL_NOTICE, "EXEC request: sync group %s\n",
+		getSaNameT (&req_exec_msg_sync_group->group_name));
+
+	/*
+	 * Ignore messages from previous ring
+	 */
+	if (memcmp (&req_exec_msg_sync_group->ring_id,
+		    &my_saved_ring_id, sizeof (struct memb_ring_id)) != 0)
+	{
+		return;
+	}
+
+	group = sync_group_find (&req_exec_msg_sync_group->group_name);
+
+	if (group == NULL)
+	{
+		group = malloc (sizeof (struct queue_group));
+		if (group == NULL) {
+#ifdef TODO
+			openais_exit_error (AIS_DONE_OUT_OF_MEMORY);
+#endif
+		}
+		memset (group, 0, sizeof (struct queue_group));
+		memcpy (&group->name,
+			&req_exec_msg_sync_group->group_name,
+			sizeof (SaNameT));
+
+		group->track_flags = req_exec_msg_sync_group->policy;
+
+		group->rr_queue = NULL; /* FIXME */
+
+		list_init (&group->group_list);
+		list_init (&group->queue_head);
+		list_add (&group->group_list, &sync_group_list_head);
+
+		/* DEBUG */
+		/* print_group_list (&sync_group_list_head); */
+	}
+}
+
+static void message_handler_req_exec_msg_sync_group_entry (
+	void *message,
+	unsigned int nodeid)
+{
+	struct req_exec_msg_sync_group_entry *req_exec_msg_sync_group_entry =
+		(struct req_exec_msg_sync_group_entry *)message;
+	struct queue_group *group;
+	struct message_queue *queue;
+
+	log_printf (LOG_LEVEL_NOTICE, "EXEC request: sync group entry %s\n",
+		getSaNameT (&req_exec_msg_sync_group_entry->group_name));
+
+	/*
+	 * Ignore messages from previous ring
+	 */
+	if (memcmp (&req_exec_msg_sync_group_entry->ring_id,
+		    &my_saved_ring_id, sizeof (struct memb_ring_id)) != 0)
+	{
+		return;
+	}
+
+	group = sync_group_find (&req_exec_msg_sync_group_entry->group_name);
+	queue = sync_queue_find (&req_exec_msg_sync_group_entry->queue_name);
+
+	assert (group != NULL);
+	assert (queue != NULL);
+
+
+	/*
+	list_init (&queue->group_list);
+	list_add (&queue->group_list, &group->queue_head);
+	*/
+
+	/* DEBUG */
+	/*
+	log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: -=-=- GROUP = %s -=-=-\n",
+		    getSaNameT (&group->name));
+	print_queue_list (&group->queue_head);
+	*/
+}
+
 static void message_handler_req_lib_msg_queueopen (
 	void *conn,
 	void *msg)
Index: lib/msg.c
===================================================================
--- lib/msg.c	(revision 1638)
+++ lib/msg.c	(working copy)
@@ -1321,6 +1321,14 @@
 	struct req_lib_msg_messagesend req_lib_msg_messagesend;
 	struct res_lib_msg_messagesend res_lib_msg_messagesend;
 
+	if (destination == NULL) {
+		return (SA_AIS_ERR_INVALID_PARAM);
+	}
+
+	if (message == NULL) {
+		return (SA_AIS_ERR_INVALID_PARAM);
+	}
+
 	/* DEBUG */
 	printf ("[DEBUG]: saMsgMessageSend { msgHandle = %llx }\n",
 		(unsigned long long) msgHandle);


More information about the Openais mailing list