[Openais] [PATCH 2/7] CTS: add cpg_zcb tests

Steven Dake sdake at redhat.com
Mon Mar 29 22:38:46 PDT 2010


good for commit

On Tue, 2010-03-30 at 13:29 +1100, Angus Salkeld wrote:
> Add a test for the zcb cpg interface.
> 
> -Angus
> 
> Signed-off-by: Angus Salkeld <asalkeld at redhat.com>
> ---
>  cts/agents/cpg_test_agent.c |  108 +++++++++++++++++++++++++++++++++++++++++-
>  cts/corotests.py            |   24 +++++++++-
>  2 files changed, 127 insertions(+), 5 deletions(-)
> 
> diff --git a/cts/agents/cpg_test_agent.c b/cts/agents/cpg_test_agent.c
> index ac87af4..8c018b5 100644
> --- a/cts/agents/cpg_test_agent.c
> +++ b/cts/agents/cpg_test_agent.c
> @@ -88,6 +88,7 @@ static struct list_head msg_log_head;
>  static pid_t my_pid;
>  static uint32_t my_nodeid;
>  static int32_t my_seq;
> +static int32_t use_zcb = 0;
>  static int32_t my_msgs_to_send;
>  static int32_t total_stored_msgs = 0;
>  
> @@ -285,13 +286,80 @@ static void send_some_more_messages_later (void)
>  	cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
>  	poll_timer_add (
>  		ta_poll_handle_get(),
> -		100, NULL,
> +		300, NULL,
>  		send_some_more_messages,
>  		&timer_handle);
>  }
>  
> +
> +
> +static void send_some_more_messages_zcb (void)
> +{
> +	msg_t *my_msg;
> +	int i;
> +	int send_now;
> +	size_t payload_size;
> +	size_t total_size;
> +	hash_state sha1_hash;
> +	cs_error_t res;
> +	cpg_flow_control_state_t fc_state;
> +	void *zcb_buffer;
> +
> +	if (cpg_fd < 0)
> +		return;
> +
> +	send_now = my_msgs_to_send;
> +	payload_size = (rand() % 100000);
> +	total_size = payload_size + sizeof (msg_t);
> +	cpg_zcb_alloc (cpg_handle, total_size, &zcb_buffer);
> +
> +	my_msg = (msg_t*)zcb_buffer;
> +
> +	//syslog (LOG_DEBUG,"%s() send_now:%d", __func__, send_now);
> +	my_msg->pid = my_pid;
> +	my_msg->nodeid = my_nodeid;
> +	my_msg->size = sizeof (msg_t) + payload_size;
> +	my_msg->seq = 0;
> +	for (i = 0; i < payload_size; i++) {
> +		my_msg->payload[i] = i;
> +	}
> +	sha1_init (&sha1_hash);
> +	sha1_process (&sha1_hash, my_msg->payload, payload_size);
> +	sha1_done (&sha1_hash, my_msg->sha1);
> +
> +	for (i = 0; i < send_now; i++) {
> +
> +		res = cpg_flow_control_state_get (cpg_handle, &fc_state);
> +		if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
> +			/* lets do this later */
> +			send_some_more_messages_later ();
> +			syslog (LOG_INFO, "%s() flow control enabled.", __func__);
> +			goto free_buffer;
> +		}
> +
> +		res = cpg_zcb_mcast_joined (cpg_handle, CPG_TYPE_AGREED, zcb_buffer, total_size);
> +		if (res == CS_ERR_TRY_AGAIN) {
> +			/* lets do this later */
> +			send_some_more_messages_later ();
> +			syslog (LOG_INFO, "%s() cpg_mcast_joined() says try again.",
> +				__func__);
> +			goto free_buffer;
> +		} else if (res != CS_OK) {
> +			syslog (LOG_ERR, "%s() -> cpg_mcast_joined error:%d, exiting.",
> +				__func__, res);
> +			exit (-2);
> +		}
> +
> +		my_msgs_to_send--;
> +	}
> +free_buffer:
> +	cpg_zcb_free (cpg_handle, zcb_buffer);
> +}
> +
> +
> +
>  static unsigned char buffer[200000];
> -static void send_some_more_messages (void * unused)
> +static void send_some_more_messages_normal (void)
>  {
>  	msg_t my_msg;
>  	struct iovec iov[2];
> @@ -353,12 +421,23 @@ static void send_some_more_messages (void * unused)
>  	}
>  }
>  
> +static void send_some_more_messages (void * unused)
> +{
> +	if (use_zcb) {
> +		send_some_more_messages_zcb ();
> +	} else {
> +		send_some_more_messages_normal ();
> +	}
> +}
> +
>  static void msg_blaster (int sock, char* num_to_send_str)
>  {
>  	my_msgs_to_send = atoi (num_to_send_str);
>  	my_seq = 1;
>  	my_pid = getpid();
>  
> +	use_zcb = 0;
> +
>  	cpg_local_get (cpg_handle, &my_nodeid);
>  
>  	/* control the limits */
> @@ -367,7 +446,26 @@ static void msg_blaster (int sock, char* num_to_send_str)
>  	if (my_msgs_to_send > 10000)
>  		my_msgs_to_send = 10000;
>  
> -	send_some_more_messages (NULL);
> +	send_some_more_messages_normal ();
> +}
> +
> +static void msg_blaster_zcb (int sock, char* num_to_send_str)
> +{
> +	my_msgs_to_send = atoi (num_to_send_str);
> +	my_seq = 1;
> +	my_pid = getpid();
> +
> +	use_zcb = 1;
> +
> +	cpg_local_get (cpg_handle, &my_nodeid);
> +
> +	/* control the limits */
> +	if (my_msgs_to_send <= 0)
> +		my_msgs_to_send = 1;
> +	if (my_msgs_to_send > 10000)
> +		my_msgs_to_send = 10000;
> +
> +	send_some_more_messages_zcb ();
>  }
>  
> 
> @@ -477,6 +575,10 @@ static void do_command (int sock, char* func, char*args[], int num_args)
>  
>  		read_messages (sock, args[0]);
>  
> +	} else if (strcmp ("msg_blaster_zcb", func) == 0) {
> +
> +		msg_blaster_zcb (sock, args[0]);
> +
>  	} else if (strcmp ("msg_blaster",func) == 0) {
>  
>  		msg_blaster (sock, args[0]);
> diff --git a/cts/corotests.py b/cts/corotests.py
> index 1d80722..e876602 100644
> --- a/cts/corotests.py
> +++ b/cts/corotests.py
> @@ -378,7 +378,7 @@ class CpgMsgOrderBase(CoroTest):
>  ###################################################################
>  class CpgMsgOrderBasic(CpgMsgOrderBase):
>      '''
> -    each sends & logs 1000 messages
> +    each sends & logs lots of messages
>      '''
>      def __init__(self, cm):
>          CpgMsgOrderBase.__init__(self,cm)
> @@ -387,9 +387,26 @@ class CpgMsgOrderBasic(CpgMsgOrderBase):
>  
>      def __call__(self, node):
>          self.incr("calls")
> -        self.cpg_msg_blaster()
> +        for n in self.CM.Env["nodes"]:
> +            self.CM.cpg_agent[n].msg_blaster(self.num_msgs_per_node)
> +
>          return self.wait_and_validate_order()
>  
> +###################################################################
> +class CpgMsgOrderZcb(CpgMsgOrderBase):
> +    '''
> +    each sends & logs lots of messages
> +    '''
> +    def __init__(self, cm):
> +        CpgMsgOrderBase.__init__(self,cm)
> +        self.name="CpgMsgOrderZcb"
> +        self.num_msgs_per_node = 9000
> +
> +    def __call__(self, node):
> +        self.incr("calls")
> +        for n in self.CM.Env["nodes"]:
> +            self.CM.cpg_agent[n].msg_blaster_zcb(self.num_msgs_per_node)
> +        return self.wait_and_validate_order()
>  
>  ###################################################################
>  class MemLeakObject(CoroTest):
> @@ -568,6 +585,7 @@ class ConfdbNotificationTest(CoroTest):
>  
>  GenTestClasses = []
>  GenTestClasses.append(CpgMsgOrderBasic)
> +GenTestClasses.append(CpgMsgOrderZcb)
>  GenTestClasses.append(CpgCfgChgOnExecCrash)
>  GenTestClasses.append(CpgCfgChgOnGroupLeave)
>  GenTestClasses.append(CpgCfgChgOnNodeLeave)
> @@ -608,6 +626,8 @@ def CoroTestList(cm, audits):
>      default['logging/logfile_priority'] = 'info'
>      default['logging/syslog_priority'] = 'info'
>      default['logging/syslog_facility'] = 'daemon'
> +    default['uidgid/uid'] = '0'
> +    default['uidgid/gid'] = '0'
>      configs.append(default)
>  
>      a = {}



More information about the Openais mailing list