[Openais] [PATCH 5/7] Propagate the flow control state between AIS exec and library
Steven Dake
sdake at redhat.com
Tue Aug 19 16:22:38 PDT 2008
Lets hold off on this patch until I finish the ipc forward port please.
Thanks
-steve
On Wed, 2008-08-20 at 06:54 +1200, angus salkeld wrote:
> This patch causes the flow control state in the library to be set
> properly when the flow control is turned off (disabled). Then it can be
> read properly by the flow control apis.
> This also fixes the case where the application is no longer sending
> messages and it has already dispatched all its received messages
> before flow control is disabled.
>
> Also, CPG response messages with a TRY_AGAIN error did NOT contain
> a valid flow control state value. This meant the library could get
> stuck with flow control enabled (flow control was never enabled
> for the EXEC, so no disable event occurred).
> This case was hit when a new node was joining - sync_in_process()
> resulted in a TRY_AGAIN for error cpg_mcast_joined).
>
> Also, in message_handler_req_exec_cpg_mcast() the state passed
> back to the library defaulted to disabled for messages received
> from another node (even if flow control was still enabled)
> - this meant if multiple nodes were sending CPG messages,
> then the library flow control state flip-flopped between
> enabled and disabled.
>
> Author: Steven Dake <sdake at redhat.com> &
> Tim Beale <tim.beale at alliedtelesis.co.nz>
> ---
> exec/apidef.c | 1 +
> exec/ipc.c | 17 ++++++++++++++++-
> exec/ipc.h | 2 ++
> include/corosync/engine/coroapi.h | 2 ++
> include/corosync/ipc_cpg.h | 8 +++++++-
> lib/cpg.c | 19 ++++++++++++++++---
> services/cpg.c | 23 ++++++++++++++++++++---
> 7 files changed, 64 insertions(+), 8 deletions(-)
>
> diff --git a/exec/apidef.c b/exec/apidef.c
> index d1dfde3..7fb1852 100644
> --- a/exec/apidef.c
> +++ b/exec/apidef.c
> @@ -69,6 +69,7 @@ static struct corosync_api_v1 apidef_corosync_api_v1 = {
> .ipc_source_is_local = message_source_is_local,
> .ipc_private_data_get = corosync_conn_private_data_get,
> .ipc_response_send = NULL,
> + .ipc_response_no_fcc = corosync_conn_send_response_no_fcc,
> .ipc_dispatch_send = NULL,
> .ipc_conn_send_response = corosync_conn_send_response,
> .ipc_conn_partner_get = corosync_conn_partner_get,
> diff --git a/exec/ipc.c b/exec/ipc.c
> index cccbcbb..facc75e 100644
> --- a/exec/ipc.c
> +++ b/exec/ipc.c
> @@ -111,6 +111,8 @@ LOGSYS_DECLARE_SUBSYS ("IPC", LOG_INFO);
>
> static unsigned int g_gid_valid = 0;
>
> +static unsigned int dont_call_flow_control = 0;
> +
> static totempg_groups_handle ipc_handle;
>
> DECLARE_LIST_INIT (conn_info_list_head);
> @@ -1125,6 +1127,17 @@ void *corosync_conn_partner_get (void *conn)
> }
> }
>
> +int corosync_conn_send_response_no_fcc (
> + void *conn,
> + void *msg,
> + int mlen)
> +{
> + dont_call_flow_control = 1;
> + corosync_conn_send_response (
> + conn, msg, mlen);
> + dont_call_flow_control = 0;
> +}
> +
> int corosync_conn_send_response (
> void *conn,
> void *msg,
> @@ -1149,7 +1162,9 @@ int corosync_conn_send_response (
> return (-1);
> }
>
> - ipc_flow_control (conn_info);
> + if (dont_call_flow_control == 0) {
> + ipc_flow_control (conn_info);
> + }
>
> outq = &conn_info->outq;
>
> diff --git a/exec/ipc.h b/exec/ipc.h
> index a29a698..fc24241 100644
> --- a/exec/ipc.h
> +++ b/exec/ipc.h
> @@ -52,6 +52,8 @@ extern void *corosync_conn_private_data_get (void *conn);
>
> extern int corosync_conn_send_response (void *conn, void *msg, int mlen);
>
> +extern int corosync_conn_send_response_no_fcc (void *conn, void *msg, int mlen);
> +
> extern void corosync_ipc_init (
> void (*serialize_lock_fn) (void),
> void (*serialize_unlock_fn) (void),
> diff --git a/include/corosync/engine/coroapi.h b/include/corosync/engine/coroapi.h
> index 0d6c60b..c46338e 100644
> --- a/include/corosync/engine/coroapi.h
> +++ b/include/corosync/engine/coroapi.h
> @@ -322,6 +322,8 @@ struct corosync_api_v1 {
>
> int (*ipc_response_send) (void *conn, void *msg, int mlen);
>
> + int (*ipc_response_no_fcc) (void *conn, void *msg, int mlen);
> +
> int (*ipc_dispatch_send) (void *conn, void *msg, int mlen);
>
> /*
> diff --git a/include/corosync/ipc_cpg.h b/include/corosync/ipc_cpg.h
> index 32b8544..c1e68be 100644
> --- a/include/corosync/ipc_cpg.h
> +++ b/include/corosync/ipc_cpg.h
> @@ -62,7 +62,8 @@ enum res_cpg_types {
> MESSAGE_RES_CPG_FLOW_CONTROL_STATE_SET = 8,
> MESSAGE_RES_CPG_LOCAL_GET = 9,
> MESSAGE_RES_CPG_GROUPS_GET = 10,
> - MESSAGE_RES_CPG_GROUPS_CALLBACK = 11
> + MESSAGE_RES_CPG_GROUPS_CALLBACK = 11,
> + MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK = 12
> };
>
> enum lib_cpg_confchg_reason {
> @@ -135,6 +136,11 @@ struct res_lib_cpg_deliver_callback {
> mar_uint8_t message[] __attribute__((aligned(8)));
> };
>
> +struct res_lib_cpg_flowcontrol_callback {
> + mar_res_header_t header __attribute__((aligned(8)));
> + mar_uint32_t flow_control_state __attribute__((aligned(8)));
> +};
> +
> struct req_lib_cpg_membership {
> mar_req_header_t header __attribute__((aligned(8)));
> mar_cpg_name_t group_name __attribute__((aligned(8)));
> diff --git a/lib/cpg.c b/lib/cpg.c
> index 6509b7a..89390f7 100644
> --- a/lib/cpg.c
> +++ b/lib/cpg.c
> @@ -248,6 +248,7 @@ cpg_error_t cpg_dispatch (
> int cont = 1; /* always continue do loop except when set to 0 */
> int dispatch_avail;
> struct cpg_inst *cpg_inst;
> + struct res_lib_cpg_flowcontrol_callback *res_cpg_flowcontrol_callback;
> struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
> struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
> struct res_lib_cpg_groups_get_callback *res_lib_cpg_groups_get_callback;
> @@ -397,6 +398,7 @@ cpg_error_t cpg_dispatch (
> joined_list,
> res_cpg_confchg_callback->joined_list_entries);
> break;
> +
> case MESSAGE_RES_CPG_GROUPS_CALLBACK:
> res_lib_cpg_groups_get_callback = (struct res_lib_cpg_groups_get_callback *)&dispatch_data;
> marshall_from_mar_cpg_name_t (
> @@ -413,6 +415,12 @@ cpg_error_t cpg_dispatch (
> &group_name,
> member_list,
> res_lib_cpg_groups_get_callback->num_members);
> +
> + break;
> +
> + case MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK:
> + res_cpg_flowcontrol_callback = (struct res_lib_cpg_flowcontrol_callback *)&dispatch_data;
> + cpg_inst->flow_control_state = res_cpg_flowcontrol_callback->flow_control_state;
> break;
>
> default:
> @@ -598,9 +606,14 @@ cpg_error_t cpg_mcast_joined (
> goto error_exit;
> }
>
> - cpg_inst->flow_control_state = res_lib_cpg_mcast.flow_control_state;
> - if (res_lib_cpg_mcast.header.error == CPG_ERR_TRY_AGAIN) {
> - cpg_inst->flow_control_state = CPG_FLOW_CONTROL_ENABLED;
> +/* Only update the flow control state when the return value is OK.
> + * Otherwise the flow control state is not guaranteed to be valid in the
> + * return message.
> + * Also, don't set to ENABLED if the return value is TRY_AGAIN as this can lead
> + * to Flow Control State sync issues between AIS LIB and EXEC.
> + */
> + if (res_lib_cpg_mcast.header.error == CPG_OK) {
> + cpg_inst->flow_control_state = res_lib_cpg_mcast.flow_control_state;
> }
> error = res_lib_cpg_mcast.header.error;
>
> diff --git a/services/cpg.c b/services/cpg.c
> index 82c426f..d7c179c 100644
> --- a/services/cpg.c
> +++ b/services/cpg.c
> @@ -738,9 +738,28 @@ static void cpg_flow_control_state_set_fn (
> void *context,
> enum corosync_flow_control_state flow_control_state)
> {
> + struct res_lib_cpg_flowcontrol_callback res_lib_cpg_flowcontrol_callback;
> struct process_info *process_info = (struct process_info *)context;
>
> process_info->flow_control_state = flow_control_state;
> + /*
> + * Send disabled flow control if a disabled occurs. This prevents
> + * the condition where a disabled occurs after all messages have been
> + * delivered and then there is no valid way to retrieve the flow
> + * control state
> + */
> + if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
> + res_lib_cpg_flowcontrol_callback.header.id = MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK;
> + res_lib_cpg_flowcontrol_callback.header.size = sizeof (struct res_lib_cpg_flowcontrol_callback);
> + res_lib_cpg_flowcontrol_callback.flow_control_state = flow_control_state;
> +
> + if (process_info->trackerconn) {
> + api->ipc_response_no_fcc (
> + process_info->trackerconn,
> + &res_lib_cpg_flowcontrol_callback,
> + sizeof (struct res_lib_cpg_flowcontrol_callback));
> + }
> + }
> }
>
> /* Can byteswap join & leave messages */
> @@ -965,7 +984,6 @@ static void message_handler_req_exec_cpg_mcast (
> {
> struct req_exec_cpg_mcast *req_exec_cpg_mcast = (struct req_exec_cpg_mcast *)message;
> struct res_lib_cpg_deliver_callback *res_lib_cpg_mcast;
> - struct process_info *process_info;
> int msglen = req_exec_cpg_mcast->msglen;
> char buf[sizeof(*res_lib_cpg_mcast) + msglen];
> struct group_info *gi;
> @@ -986,8 +1004,6 @@ static void message_handler_req_exec_cpg_mcast (
> res_lib_cpg_mcast->flow_control_state = CPG_FLOW_CONTROL_DISABLED;
> if (api->ipc_source_is_local (&req_exec_cpg_mcast->source)) {
> api->ipc_refcnt_dec (req_exec_cpg_mcast->source.conn);
> - process_info = (struct process_info *)api->ipc_private_data_get (req_exec_cpg_mcast->source.conn);
> - res_lib_cpg_mcast->flow_control_state = process_info->flow_control_state;
> }
> memcpy(&res_lib_cpg_mcast->group_name, &gi->group_name,
> sizeof(mar_cpg_name_t));
> @@ -998,6 +1014,7 @@ static void message_handler_req_exec_cpg_mcast (
> for (iter = gi->members.next; iter != &gi->members; iter = iter->next) {
> struct process_info *pi = list_entry(iter, struct process_info, list);
> if (pi->trackerconn && (pi->flags & PI_FLAG_MEMBER)) {
> + res_lib_cpg_mcast->flow_control_state = pi->flow_control_state;
> api->ipc_conn_send_response(
> pi->trackerconn,
> buf,
More information about the Openais
mailing list