[Openais] [PATCH 5/7] Propagate the flow control state between AIS exec and library

Steven Dake sdake at redhat.com
Tue Aug 19 16:22:38 PDT 2008


Lets hold off on this patch until I finish the ipc forward port please.

Thanks
-steve

On Wed, 2008-08-20 at 06:54 +1200, angus salkeld wrote:
> This patch causes the flow control state in the library to be set
> properly when the flow control is turned off (disabled).  Then it can be
> read properly by the flow control apis.
> This also fixes the case where the application is no longer sending
> messages and it has already dispatched all its received messages
> before flow control is disabled.
> 
> Also, CPG response messages with a TRY_AGAIN error did NOT contain
> a valid flow control state value. This meant the library could get
> stuck with flow control enabled (flow control was never enabled
> for the EXEC, so no disable event occurred).
> This case was hit when a new node was joining - sync_in_process()
> resulted in a TRY_AGAIN for error cpg_mcast_joined).
> 
> Also, in message_handler_req_exec_cpg_mcast() the state passed
> back to the library defaulted to disabled for messages received
> from another node (even if flow control was still enabled)
> - this meant if multiple nodes were sending CPG messages,
>   then the library flow control state flip-flopped between
>   enabled and disabled.
> 
> Author: Steven Dake <sdake at redhat.com> &
>         Tim Beale <tim.beale at alliedtelesis.co.nz>
> ---
>  exec/apidef.c                     |    1 +
>  exec/ipc.c                        |   17 ++++++++++++++++-
>  exec/ipc.h                        |    2 ++
>  include/corosync/engine/coroapi.h |    2 ++
>  include/corosync/ipc_cpg.h        |    8 +++++++-
>  lib/cpg.c                         |   19 ++++++++++++++++---
>  services/cpg.c                    |   23 ++++++++++++++++++++---
>  7 files changed, 64 insertions(+), 8 deletions(-)
> 
> diff --git a/exec/apidef.c b/exec/apidef.c
> index d1dfde3..7fb1852 100644
> --- a/exec/apidef.c
> +++ b/exec/apidef.c
> @@ -69,6 +69,7 @@ static struct corosync_api_v1 apidef_corosync_api_v1 = {
>  	.ipc_source_is_local = message_source_is_local,
>  	.ipc_private_data_get = corosync_conn_private_data_get,
>  	.ipc_response_send = NULL,
> +	.ipc_response_no_fcc = corosync_conn_send_response_no_fcc,
>  	.ipc_dispatch_send = NULL,
>  	.ipc_conn_send_response = corosync_conn_send_response,
>  	.ipc_conn_partner_get = corosync_conn_partner_get,
> diff --git a/exec/ipc.c b/exec/ipc.c
> index cccbcbb..facc75e 100644
> --- a/exec/ipc.c
> +++ b/exec/ipc.c
> @@ -111,6 +111,8 @@ LOGSYS_DECLARE_SUBSYS ("IPC", LOG_INFO);
>  
>  static unsigned int g_gid_valid = 0;
>  
> +static unsigned int dont_call_flow_control = 0;
> +
>  static totempg_groups_handle ipc_handle;
>  
>  DECLARE_LIST_INIT (conn_info_list_head);
> @@ -1125,6 +1127,17 @@ void *corosync_conn_partner_get (void *conn)
>  	}
>  }
>  
> +int corosync_conn_send_response_no_fcc (
> +	void *conn,
> +	void *msg,
> +	int mlen)
> +{
> +	dont_call_flow_control = 1;
> +	corosync_conn_send_response (
> +		conn, msg, mlen);
> +	dont_call_flow_control = 0;
> +}
> +
>  int corosync_conn_send_response (
>  	void *conn,
>  	void *msg,
> @@ -1149,7 +1162,9 @@ int corosync_conn_send_response (
>  		return (-1);
>  	}
>  
> -	ipc_flow_control (conn_info);
> +	if (dont_call_flow_control == 0) {
> +		ipc_flow_control (conn_info);
> +	}
>  
>  	outq = &conn_info->outq;
>  
> diff --git a/exec/ipc.h b/exec/ipc.h
> index a29a698..fc24241 100644
> --- a/exec/ipc.h
> +++ b/exec/ipc.h
> @@ -52,6 +52,8 @@ extern void *corosync_conn_private_data_get (void *conn);
>  
>  extern int corosync_conn_send_response (void *conn, void *msg, int mlen);
>  
> +extern int corosync_conn_send_response_no_fcc (void *conn, void *msg, int mlen);
> +
>  extern void corosync_ipc_init (
>          void (*serialize_lock_fn) (void),
>          void (*serialize_unlock_fn) (void),
> diff --git a/include/corosync/engine/coroapi.h b/include/corosync/engine/coroapi.h
> index 0d6c60b..c46338e 100644
> --- a/include/corosync/engine/coroapi.h
> +++ b/include/corosync/engine/coroapi.h
> @@ -322,6 +322,8 @@ struct corosync_api_v1 {
>  
>  	int (*ipc_response_send) (void *conn, void *msg, int mlen);
>  
> +	int (*ipc_response_no_fcc) (void *conn, void *msg, int mlen);
> +
>  	int (*ipc_dispatch_send) (void *conn, void *msg, int mlen);
>  
>  	/*
> diff --git a/include/corosync/ipc_cpg.h b/include/corosync/ipc_cpg.h
> index 32b8544..c1e68be 100644
> --- a/include/corosync/ipc_cpg.h
> +++ b/include/corosync/ipc_cpg.h
> @@ -62,7 +62,8 @@ enum res_cpg_types {
>  	MESSAGE_RES_CPG_FLOW_CONTROL_STATE_SET = 8,
>  	MESSAGE_RES_CPG_LOCAL_GET = 9,
>  	MESSAGE_RES_CPG_GROUPS_GET = 10,
> -	MESSAGE_RES_CPG_GROUPS_CALLBACK = 11
> +	MESSAGE_RES_CPG_GROUPS_CALLBACK = 11,
> +	MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK = 12
>  };
>  
>  enum lib_cpg_confchg_reason {
> @@ -135,6 +136,11 @@ struct res_lib_cpg_deliver_callback {
>  	mar_uint8_t message[] __attribute__((aligned(8)));
>  };
>  
> +struct res_lib_cpg_flowcontrol_callback {
> +	mar_res_header_t header __attribute__((aligned(8)));
> +	mar_uint32_t flow_control_state __attribute__((aligned(8)));
> +};
> +
>  struct req_lib_cpg_membership {
>  	mar_req_header_t header __attribute__((aligned(8)));
>  	mar_cpg_name_t group_name __attribute__((aligned(8)));
> diff --git a/lib/cpg.c b/lib/cpg.c
> index 6509b7a..89390f7 100644
> --- a/lib/cpg.c
> +++ b/lib/cpg.c
> @@ -248,6 +248,7 @@ cpg_error_t cpg_dispatch (
>  	int cont = 1; /* always continue do loop except when set to 0 */
>  	int dispatch_avail;
>  	struct cpg_inst *cpg_inst;
> +	struct res_lib_cpg_flowcontrol_callback *res_cpg_flowcontrol_callback;
>  	struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
>  	struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
>  	struct res_lib_cpg_groups_get_callback *res_lib_cpg_groups_get_callback;
> @@ -397,6 +398,7 @@ cpg_error_t cpg_dispatch (
>  				joined_list,
>  				res_cpg_confchg_callback->joined_list_entries);
>  			break;
> +
>  		case MESSAGE_RES_CPG_GROUPS_CALLBACK:
>  			res_lib_cpg_groups_get_callback = (struct res_lib_cpg_groups_get_callback *)&dispatch_data;
>  			marshall_from_mar_cpg_name_t (
> @@ -413,6 +415,12 @@ cpg_error_t cpg_dispatch (
>  						    &group_name,
>  						    member_list,
>  						    res_lib_cpg_groups_get_callback->num_members);
> +
> +			break;
> +
> +		case MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK:
> +			res_cpg_flowcontrol_callback = (struct res_lib_cpg_flowcontrol_callback *)&dispatch_data;
> +			cpg_inst->flow_control_state = res_cpg_flowcontrol_callback->flow_control_state;
>  			break;
>  
>  		default:
> @@ -598,9 +606,14 @@ cpg_error_t cpg_mcast_joined (
>  		goto error_exit;
>  	}
>  
> -	cpg_inst->flow_control_state = res_lib_cpg_mcast.flow_control_state;
> -	if (res_lib_cpg_mcast.header.error == CPG_ERR_TRY_AGAIN) {
> -		cpg_inst->flow_control_state = CPG_FLOW_CONTROL_ENABLED;
> +/*	Only update the flow control state when the return value is OK.
> + *	Otherwise the flow control state is not guaranteed to be valid in the
> + *	return message.
> + *	Also, don't set to ENABLED if the return value is TRY_AGAIN as this can lead
> + *	to Flow Control State sync issues between AIS LIB and EXEC.
> + */
> +	if (res_lib_cpg_mcast.header.error == CPG_OK) {
> +		cpg_inst->flow_control_state = res_lib_cpg_mcast.flow_control_state;
>  	}
>  	error = res_lib_cpg_mcast.header.error;
>  
> diff --git a/services/cpg.c b/services/cpg.c
> index 82c426f..d7c179c 100644
> --- a/services/cpg.c
> +++ b/services/cpg.c
> @@ -738,9 +738,28 @@ static void cpg_flow_control_state_set_fn (
>  	void *context,
>  	enum corosync_flow_control_state flow_control_state)
>  {
> +	struct res_lib_cpg_flowcontrol_callback res_lib_cpg_flowcontrol_callback;
>  	struct process_info *process_info = (struct process_info *)context;
>  
>  	process_info->flow_control_state = flow_control_state;
> +	/*
> +	 * Send disabled flow control if a disabled occurs.  This prevents
> +	 * the condition where a disabled occurs after all messages have been
> +	 * delivered and then there is no valid way to retrieve the flow
> +	 * control state
> +	 */
> +	if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
> +		res_lib_cpg_flowcontrol_callback.header.id = MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK;
> +		res_lib_cpg_flowcontrol_callback.header.size = sizeof (struct res_lib_cpg_flowcontrol_callback);
> +		res_lib_cpg_flowcontrol_callback.flow_control_state = flow_control_state;
> +
> +		if (process_info->trackerconn) {
> +			api->ipc_response_no_fcc (
> +				process_info->trackerconn,
> +				&res_lib_cpg_flowcontrol_callback,
> +				sizeof (struct res_lib_cpg_flowcontrol_callback));
> +		}
> +	}
>  }
>  
>  /* Can byteswap join & leave messages */
> @@ -965,7 +984,6 @@ static void message_handler_req_exec_cpg_mcast (
>  {
>  	struct req_exec_cpg_mcast *req_exec_cpg_mcast = (struct req_exec_cpg_mcast *)message;
>  	struct res_lib_cpg_deliver_callback *res_lib_cpg_mcast;
> -	struct process_info *process_info;
>  	int msglen = req_exec_cpg_mcast->msglen;
>  	char buf[sizeof(*res_lib_cpg_mcast) + msglen];
>  	struct group_info *gi;
> @@ -986,8 +1004,6 @@ static void message_handler_req_exec_cpg_mcast (
>  	res_lib_cpg_mcast->flow_control_state = CPG_FLOW_CONTROL_DISABLED;
>  	if (api->ipc_source_is_local (&req_exec_cpg_mcast->source)) {
>  		api->ipc_refcnt_dec (req_exec_cpg_mcast->source.conn);
> -		process_info = (struct process_info *)api->ipc_private_data_get (req_exec_cpg_mcast->source.conn);
> -		res_lib_cpg_mcast->flow_control_state = process_info->flow_control_state;
>  	}
>  	memcpy(&res_lib_cpg_mcast->group_name, &gi->group_name,
>  		sizeof(mar_cpg_name_t));
> @@ -998,6 +1014,7 @@ static void message_handler_req_exec_cpg_mcast (
>  	for (iter = gi->members.next; iter != &gi->members; iter = iter->next) {
>  		struct process_info *pi = list_entry(iter, struct process_info, list);
>  		if (pi->trackerconn && (pi->flags & PI_FLAG_MEMBER)) {
> +			res_lib_cpg_mcast->flow_control_state = pi->flow_control_state;
>  			api->ipc_conn_send_response(
>  				pi->trackerconn,
>  				buf,



More information about the Openais mailing list