[Openais] [PATCH 2/7] CTS: add cpg_zcb tests

Angus Salkeld asalkeld at redhat.com
Mon Mar 29 19:29:24 PDT 2010


Add a test for the zcb cpg interface.

-Angus

Signed-off-by: Angus Salkeld <asalkeld at redhat.com>
---
 cts/agents/cpg_test_agent.c |  108 +++++++++++++++++++++++++++++++++++++++++-
 cts/corotests.py            |   24 +++++++++-
 2 files changed, 127 insertions(+), 5 deletions(-)

diff --git a/cts/agents/cpg_test_agent.c b/cts/agents/cpg_test_agent.c
index ac87af4..8c018b5 100644
--- a/cts/agents/cpg_test_agent.c
+++ b/cts/agents/cpg_test_agent.c
@@ -88,6 +88,7 @@ static struct list_head msg_log_head;
 static pid_t my_pid;
 static uint32_t my_nodeid;
 static int32_t my_seq;
+static int32_t use_zcb = 0;
 static int32_t my_msgs_to_send;
 static int32_t total_stored_msgs = 0;
 
@@ -285,13 +286,80 @@ static void send_some_more_messages_later (void)
 	cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
 	poll_timer_add (
 		ta_poll_handle_get(),
-		100, NULL,
+		300, NULL,
 		send_some_more_messages,
 		&timer_handle);
 }
 
+
+
+static void send_some_more_messages_zcb (void)
+{
+	msg_t *my_msg;
+	int i;
+	int send_now;
+	size_t payload_size;
+	size_t total_size;
+	hash_state sha1_hash;
+	cs_error_t res;
+	cpg_flow_control_state_t fc_state;
+	void *zcb_buffer;
+
+	if (cpg_fd < 0)
+		return;
+
+	send_now = my_msgs_to_send;
+	payload_size = (rand() % 100000);
+	total_size = payload_size + sizeof (msg_t);
+	cpg_zcb_alloc (cpg_handle, total_size, &zcb_buffer);
+
+	my_msg = (msg_t*)zcb_buffer;
+
+	//syslog (LOG_DEBUG,"%s() send_now:%d", __func__, send_now);
+	my_msg->pid = my_pid;
+	my_msg->nodeid = my_nodeid;
+	my_msg->size = sizeof (msg_t) + payload_size;
+	my_msg->seq = 0;
+	for (i = 0; i < payload_size; i++) {
+		my_msg->payload[i] = i;
+	}
+	sha1_init (&sha1_hash);
+	sha1_process (&sha1_hash, my_msg->payload, payload_size);
+	sha1_done (&sha1_hash, my_msg->sha1);
+
+	for (i = 0; i < send_now; i++) {
+
+		res = cpg_flow_control_state_get (cpg_handle, &fc_state);
+		if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
+			/* lets do this later */
+			send_some_more_messages_later ();
+			syslog (LOG_INFO, "%s() flow control enabled.", __func__);
+			goto free_buffer;
+		}
+
+		res = cpg_zcb_mcast_joined (cpg_handle, CPG_TYPE_AGREED, zcb_buffer, total_size);
+		if (res == CS_ERR_TRY_AGAIN) {
+			/* lets do this later */
+			send_some_more_messages_later ();
+			syslog (LOG_INFO, "%s() cpg_mcast_joined() says try again.",
+				__func__);
+			goto free_buffer;
+		} else if (res != CS_OK) {
+			syslog (LOG_ERR, "%s() -> cpg_mcast_joined error:%d, exiting.",
+				__func__, res);
+			exit (-2);
+		}
+
+		my_msgs_to_send--;
+	}
+free_buffer:
+	cpg_zcb_free (cpg_handle, zcb_buffer);
+}
+
+
+
 static unsigned char buffer[200000];
-static void send_some_more_messages (void * unused)
+static void send_some_more_messages_normal (void)
 {
 	msg_t my_msg;
 	struct iovec iov[2];
@@ -353,12 +421,23 @@ static void send_some_more_messages (void * unused)
 	}
 }
 
+static void send_some_more_messages (void * unused)
+{
+	if (use_zcb) {
+		send_some_more_messages_zcb ();
+	} else {
+		send_some_more_messages_normal ();
+	}
+}
+
 static void msg_blaster (int sock, char* num_to_send_str)
 {
 	my_msgs_to_send = atoi (num_to_send_str);
 	my_seq = 1;
 	my_pid = getpid();
 
+	use_zcb = 0;
+
 	cpg_local_get (cpg_handle, &my_nodeid);
 
 	/* control the limits */
@@ -367,7 +446,26 @@ static void msg_blaster (int sock, char* num_to_send_str)
 	if (my_msgs_to_send > 10000)
 		my_msgs_to_send = 10000;
 
-	send_some_more_messages (NULL);
+	send_some_more_messages_normal ();
+}
+
+static void msg_blaster_zcb (int sock, char* num_to_send_str)
+{
+	my_msgs_to_send = atoi (num_to_send_str);
+	my_seq = 1;
+	my_pid = getpid();
+
+	use_zcb = 1;
+
+	cpg_local_get (cpg_handle, &my_nodeid);
+
+	/* control the limits */
+	if (my_msgs_to_send <= 0)
+		my_msgs_to_send = 1;
+	if (my_msgs_to_send > 10000)
+		my_msgs_to_send = 10000;
+
+	send_some_more_messages_zcb ();
 }
 

@@ -477,6 +575,10 @@ static void do_command (int sock, char* func, char*args[], int num_args)
 
 		read_messages (sock, args[0]);
 
+	} else if (strcmp ("msg_blaster_zcb", func) == 0) {
+
+		msg_blaster_zcb (sock, args[0]);
+
 	} else if (strcmp ("msg_blaster",func) == 0) {
 
 		msg_blaster (sock, args[0]);
diff --git a/cts/corotests.py b/cts/corotests.py
index 1d80722..e876602 100644
--- a/cts/corotests.py
+++ b/cts/corotests.py
@@ -378,7 +378,7 @@ class CpgMsgOrderBase(CoroTest):
 ###################################################################
 class CpgMsgOrderBasic(CpgMsgOrderBase):
     '''
-    each sends & logs 1000 messages
+    each sends & logs lots of messages
     '''
     def __init__(self, cm):
         CpgMsgOrderBase.__init__(self,cm)
@@ -387,9 +387,26 @@ class CpgMsgOrderBasic(CpgMsgOrderBase):
 
     def __call__(self, node):
         self.incr("calls")
-        self.cpg_msg_blaster()
+        for n in self.CM.Env["nodes"]:
+            self.CM.cpg_agent[n].msg_blaster(self.num_msgs_per_node)
+
         return self.wait_and_validate_order()
 
+###################################################################
+class CpgMsgOrderZcb(CpgMsgOrderBase):
+    '''
+    each sends & logs lots of messages
+    '''
+    def __init__(self, cm):
+        CpgMsgOrderBase.__init__(self,cm)
+        self.name="CpgMsgOrderZcb"
+        self.num_msgs_per_node = 9000
+
+    def __call__(self, node):
+        self.incr("calls")
+        for n in self.CM.Env["nodes"]:
+            self.CM.cpg_agent[n].msg_blaster_zcb(self.num_msgs_per_node)
+        return self.wait_and_validate_order()
 
 ###################################################################
 class MemLeakObject(CoroTest):
@@ -568,6 +585,7 @@ class ConfdbNotificationTest(CoroTest):
 
 GenTestClasses = []
 GenTestClasses.append(CpgMsgOrderBasic)
+GenTestClasses.append(CpgMsgOrderZcb)
 GenTestClasses.append(CpgCfgChgOnExecCrash)
 GenTestClasses.append(CpgCfgChgOnGroupLeave)
 GenTestClasses.append(CpgCfgChgOnNodeLeave)
@@ -608,6 +626,8 @@ def CoroTestList(cm, audits):
     default['logging/logfile_priority'] = 'info'
     default['logging/syslog_priority'] = 'info'
     default['logging/syslog_facility'] = 'daemon'
+    default['uidgid/uid'] = '0'
+    default['uidgid/gid'] = '0'
     configs.append(default)
 
     a = {}
-- 
1.6.6.1




More information about the Openais mailing list