[Openais] [PATCH 1/3] Stats-2 Add connection related statistics to the object db.

Angus Salkeld angus.salkeld at gmail.com
Sun Oct 11 01:36:41 PDT 2009


On Fri, 9 Oct 2009, Steven Dake wrote:

steve> comments inline.  Great work, although needs a bit of rework for api
steve> compat reasons.

Hi

Here is an improved patch, that includes the changes
you wanted.

Note: there are some portability issues as you picked up.
1) in req_setup_recv() needs to get the pid of the connected
   process.
   Currently only Linux is tested, bsd is supported but not
   tested and I don't know how to do this on OSX.

2) in coroipcs_init_conn_stats() we need to get the process
   name from the pid.
   Currently it uses /proc/<pid>/stat.
   If anyone has a better way (more portable) of doing this
   I'de like to know.

Steve: I could commit this now and fix later, else I need some
help in testing on these OS's.


Regards
Angus Salkeld


Index: include/corosync/coroipcs.h
===================================================================
--- include/corosync/coroipcs.h	(revision 2511)
+++ include/corosync/coroipcs.h	(working copy)
@@ -74,9 +74,21 @@
 	coroipcs_handler_fn_lvalue (*handler_fn_get)(unsigned int service, unsigned int id);
 };
 
+struct coroipcs_init_stats_state {
+	hdb_handle_t (*stats_create_connection) (const char* name,
+			const pid_t pid, const int fd);
+	void (*stats_destroy_connection) (hdb_handle_t handle);
+	void (*stats_update_value) (hdb_handle_t handle,
+			const char* name, const void* value, const size_t value_len);
+	void (*stats_increment_value) (hdb_handle_t handle, const char* name);
+};
+
 extern void coroipcs_ipc_init (
 	struct coroipcs_init_state *init_state);
 
+extern void coroipcs_ipc_stats_init (
+        struct coroipcs_init_stats_state *init_stats_state);
+
 extern void *coroipcs_private_data_get (void *conn);
 
 extern int coroipcs_response_send (
Index: exec/coroipcs.c
===================================================================
--- exec/coroipcs.c	(revision 2511)
+++ exec/coroipcs.c	(working copy)
@@ -72,6 +72,7 @@
 #include <corosync/list.h>
 
 #include <corosync/coroipc_types.h>
+#include <corosync/hdb.h>
 #include <corosync/coroipcs.h>
 #include <corosync/coroipc_ipc.h>
 
@@ -91,6 +92,7 @@
 #define MSG_SEND_UNLOCKED	1
 
 static struct coroipcs_init_state *api;
+static struct coroipcs_init_stats_state *stats_api;
 
 DECLARE_LIST_INIT (conn_info_list_head);
 
@@ -130,11 +132,13 @@
 struct conn_info {
 	int fd;
 	pthread_t thread;
+	pid_t client_pid;
 	pthread_attr_t thread_attr;
 	unsigned int service;
 	enum conn_state state;
 	int notify_flow_control_enabled;
 	int refcount;
+	hdb_handle_t stats_handle;
 #if _POSIX_THREAD_PROCESS_SHARED < 1
 	key_t semkey;
 	int semid;
@@ -180,6 +184,8 @@
 retry_semop:
 	res = sem_post (&conn_info->control_buffer->sem0);
 	if (res == -1 && errno == EINTR) {
+		stats_api->stats_increment_value (conn_info->stats_handle,
+									 "sem_retry_count");
 		goto retry_semop;
 	}
 #else
@@ -190,6 +196,8 @@
 retry_semop:
 	res = semop (conn_info->semid, &sop, 1);
 	if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
+		stats_api->stats_increment_value (conn_info->stats_handle,
+									 "sem_retry_count");
 		goto retry_semop;
 	}
 #endif
@@ -437,6 +445,9 @@
 	 * Retry library exit function if busy
 	 */
 	if (conn_info->state == CONN_STATE_THREAD_DESTROYED) {
+
+		stats_api->stats_destroy_connection (conn_info->stats_handle);
+
 		res = api->exit_fn_get (conn_info->service) (conn_info);
 		if (res == -1) {
 			api->serialize_unlock ();
@@ -588,6 +599,8 @@
 			pthread_exit (0);
 		}
 		if ((res == -1) && (errno == EINTR)) {
+			stats_api->stats_increment_value (conn_info->stats_handle,
+										"sem_retry_count");
 			goto retry_semwait;
 		}
 #else
@@ -602,6 +615,8 @@
 		}
 		res = semop (conn_info->semid, &sop, 1);
 		if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
+			stats_api->stats_increment_value (conn_info->stats_handle,
+										"sem_retry_count");
 			goto retry_semop;
 		} else
 		if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
@@ -639,12 +654,17 @@
 		} else 
 		if (send_ok) {
 			api->serialize_lock();
+
+			stats_api->stats_increment_value (conn_info->stats_handle,
+										"requests");
 			api->handler_fn_get (conn_info->service, header->id) (conn_info, header);
 			api->serialize_unlock();
 		} else {
 			/*
 			 * Overload, tell library to retry
 			 */
+			stats_api->stats_increment_value (conn_info->stats_handle,
+										"sem_retry_count");
 			coroipc_response_header.size = sizeof (coroipc_response_header_t);
 			coroipc_response_header.id = 0;
 			coroipc_response_header.error = CS_ERR_TRY_AGAIN;
@@ -672,9 +692,13 @@
 retry_send:
 	res = send (conn_info->fd, &res_setup, sizeof (mar_res_setup_t), MSG_WAITALL);
 	if (res == -1 && errno == EINTR) {
+		stats_api->stats_increment_value (conn_info->stats_handle,
+									"send_retry_count");
 		goto retry_send;
 	} else
 	if (res == -1 && errno == EAGAIN) {
+		stats_api->stats_increment_value (conn_info->stats_handle,
+									"send_retry_count");
 		goto retry_send;
 	}
 	return (0);
@@ -719,6 +743,8 @@
 retry_recv:
 	res = recvmsg (conn_info->fd, &msg_recv, MSG_NOSIGNAL);
 	if (res == -1 && errno == EINTR) {
+		stats_api->stats_increment_value (conn_info->stats_handle,
+									"recv_retry_count");
 		goto retry_recv;
 	} else
 	if (res == -1 && errno != EAGAIN) {
@@ -753,6 +779,7 @@
 		if (getpeerucred (conn_info->fd, &uc) == 0) {
 			euid = ucred_geteuid (uc);
 			egid = ucred_getegid (uc);
+			conn_info->client_pid = ucred_getpid (uc);
 			if (api->security_valid (euid, egid)) {
 				authenticated = 1;
 			}
@@ -768,6 +795,10 @@
 		uid_t euid;
 		gid_t egid;
 
+		/*
+		 * TODO get the peer's pid.
+		 * conn_info->client_pid = ?;
+		 */
 		euid = -1;
 		egid = -1;
 		if (getpeereid (conn_info->fd, &euid, &egid) == 0) {
@@ -785,6 +816,7 @@
 	assert (cmsg);
 	cred = (struct ucred *)CMSG_DATA (cmsg);
 	if (cred) {
+		conn_info->client_pid = cred->pid;
 		if (api->security_valid (cred->uid, cred->gid)) {
 			authenticated = 1;
 		}
@@ -838,6 +870,7 @@
 	memset (conn_info, 0, sizeof (struct conn_info));
 
 	conn_info->fd = fd;
+	conn_info->client_pid = 0;
 	conn_info->service = SOCKET_SERVICE_INIT;
 	conn_info->state = CONN_STATE_THREAD_INACTIVE;
 	list_init (&conn_info->outq_head);
@@ -845,7 +878,7 @@
 	list_init (&conn_info->zcb_mapped_list_head);
 	list_add (&conn_info->list, &conn_info_list_head);
 
-        api->poll_dispatch_add (fd, conn_info);
+	api->poll_dispatch_add (fd, conn_info);
 
 	return (0);
 }
@@ -926,12 +959,18 @@
 #endif
 	listen (server_fd, SERVER_BACKLOG);
 
-        /*
-         * Setup connection dispatch routine
-         */
-        api->poll_accept_add (server_fd);
+	/*
+	 * Setup connection dispatch routine
+	 */
+	api->poll_accept_add (server_fd);
 }
 
+extern void coroipcs_ipc_stats_init (
+        struct coroipcs_init_stats_state *init_stats_state)
+{
+	stats_api = init_stats_state;
+}
+
 void coroipcs_ipc_exit (void)
 {
 	struct list_head *list;
@@ -1000,12 +1039,16 @@
 retry_semop:
 	res = semop (conn_info->semid, &sop, 1);
 	if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
+		stats_api->stats_increment_value (conn_info->stats_handle,
+									 "sem_retry_count");
 		goto retry_semop;
 	} else
 	if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
 		return (0);
 	}
 #endif
+	stats_api->stats_increment_value (conn_info->stats_handle,
+								"responses");
 	return (0);
 }
 
@@ -1038,12 +1081,16 @@
 retry_semop:
 	res = semop (conn_info->semid, &sop, 1);
 	if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
+		stats_api->stats_increment_value (conn_info->stats_handle,
+									"sem_retry_count");
 		goto retry_semop;
 	} else
 	if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
 		return (0);
 	}
 #endif
+	stats_api->stats_increment_value (conn_info->stats_handle,
+								"responses");
 	return (0);
 }
 
@@ -1115,12 +1162,16 @@
 retry_semop:
 	res = semop (conn_info->semid, &sop, 1);
 	if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
+		stats_api->stats_increment_value (conn_info->stats_handle,
+									"sem_retry_count");
 		goto retry_semop;
 	} else
 	if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
 		return;
 	}
 #endif
+	stats_api->stats_increment_value (conn_info->stats_handle,
+								"dispatched");
 }
 
 static void outq_flush (struct conn_info *conn_info) {
@@ -1173,9 +1224,13 @@
 		sizeof (mar_req_priv_change),
 		MSG_NOSIGNAL);
 	if (res == -1 && errno == EINTR) {
+		stats_api->stats_increment_value (conn_info->stats_handle,
+									"recv_retry_count");
 		goto retry_recv;
 	}
 	if (res == -1 && errno == EAGAIN) {
+		stats_api->stats_increment_value (conn_info->stats_handle,
+									"recv_retry_count");
 		goto retry_recv;
 	}
 	if (res == -1 && errno != EAGAIN) {
@@ -1346,6 +1401,69 @@
 	return (0);
 }
 
+static char * pid_to_name (pid_t pid, char* out_name, size_t name_len)
+{
+	char *name;
+	char *rest;
+	FILE *fp;
+	char fname[32];
+	char buf[256];
+
+	snprintf (fname, 32, "/proc/%d/stat", pid);
+	fp = fopen (fname, "r");
+	if (!fp) {
+		return NULL;
+	}
+
+	if (fgets (buf, sizeof (buf), fp) == NULL) {
+		fclose (fp);
+		return NULL;
+	}
+	fclose (fp);
+
+	name = strrchr (buf, '(');
+	if (!name) {
+		return NULL;
+	}
+
+	/* move past the bracket */
+	name++;
+
+	rest = strrchr (buf, ')');
+
+	if (rest == NULL || rest[1] != ' ') {
+		return NULL;
+	}
+
+	*rest = '\0';
+	/* move past the NULL and space */
+	rest += 2;
+
+	/* copy the name */
+	strncpy (out_name, name, name_len);
+	out_name[name_len] = '\0';
+	return out_name;
+}
+
+static void coroipcs_init_conn_stats (
+	struct conn_info * conn)
+{
+	char conn_name[42];
+	char proc_name[32];
+
+	if (conn->client_pid > 0) {
+		if (pid_to_name (conn->client_pid, proc_name, sizeof(proc_name)))
+			snprintf (conn_name, sizeof(conn_name), "%s:%d:%d", proc_name, conn->client_pid, conn->fd);
+		else
+			snprintf (conn_name, sizeof(conn_name), "%d:%d", conn->client_pid, conn->fd);
+	} else
+		snprintf (conn_name, sizeof(conn_name), "%d", conn->fd);
+
+	conn->stats_handle = stats_api->stats_create_connection (conn_name, conn->client_pid, conn->fd);
+	stats_api->stats_update_value (conn->stats_handle, "service_id",
+							 &conn->service, sizeof(conn->service));
+}
+
 int coroipcs_handler_dispatch (
 	int fd,
 	int revent,
@@ -1447,6 +1565,9 @@
 
 		api->init_fn_get (conn_info->service) (conn_info);
 
+		/* create stats objects */
+		coroipcs_init_conn_stats (conn_info);
+
 		pthread_attr_init (&conn_info->thread_attr);
 		/*
 		* IA64 needs more stack space then other arches
Index: exec/main.c
===================================================================
--- exec/main.c	(revision 2511)
+++ exec/main.c	(working copy)
@@ -129,6 +129,8 @@
 
 struct sched_param global_sched_param;
 
+static hdb_handle_t object_connection_handle;
+
 hdb_handle_t corosync_poll_handle_get (void)
 {
 	return (corosync_poll_handle);
@@ -669,6 +671,105 @@
 		corosync_poll_handler_dispatch);
 }
 
+
+static hdb_handle_t corosync_stats_create_connection (const char* name,
+			const pid_t pid, const int fd)
+{
+	uint32_t zero_32 = 0;
+	unsigned int key_incr_dummy;
+	hdb_handle_t object_handle;
+
+	objdb->object_key_increment (object_connection_handle,
+								 "active", strlen("active"),
+								 &key_incr_dummy);
+
+	objdb->object_create (object_connection_handle,
+		&object_handle,
+		name,
+		strlen (name));
+
+	objdb->object_key_create_typed (object_handle,
+		"service_id",
+		&zero_32,
+		sizeof (zero_32),
+		OBJDB_VALUETYPE_UINT32);
+
+	objdb->object_key_create_typed (object_handle,
+		"client_pid",
+		&pid,
+		sizeof (int),
+		OBJDB_VALUETYPE_INT32);
+
+	objdb->object_key_create_typed (object_handle,
+		"responses",
+		&zero_32,
+		sizeof (int),
+		OBJDB_VALUETYPE_UINT32);
+
+	objdb->object_key_create_typed (object_handle,
+		"dispatched",
+		&zero_32,
+		sizeof (int),
+		OBJDB_VALUETYPE_UINT32);
+
+	objdb->object_key_create_typed (object_handle,
+		"requests",
+		&zero_32,
+		sizeof (int),
+		OBJDB_VALUETYPE_INT32);
+
+	objdb->object_key_create_typed (object_handle,
+		"sem_retry_count",
+		&zero_32,
+		sizeof (int),
+		OBJDB_VALUETYPE_UINT32);
+
+	objdb->object_key_create_typed (object_handle,
+		"send_retry_count",
+		&zero_32,
+		sizeof (int),
+		OBJDB_VALUETYPE_UINT32);
+
+	objdb->object_key_create_typed (object_handle,
+		"recv_retry_count",
+		&zero_32,
+		sizeof (int),
+		OBJDB_VALUETYPE_UINT32);
+
+	return object_handle;
+}
+
+static void corosync_stats_destroy_connection (hdb_handle_t handle)
+{
+	unsigned int key_incr_dummy;
+
+	objdb->object_destroy (handle);
+
+	objdb->object_key_increment (object_connection_handle,
+								 "closed", strlen("closed"),
+								 &key_incr_dummy);
+	objdb->object_key_decrement (object_connection_handle,
+								 "active", strlen("active"),
+								 &key_incr_dummy);
+}
+
+static void corosync_stats_update_value (hdb_handle_t handle,
+			const char* name, const void* value, const size_t value_len)
+{
+	objdb->object_key_replace (handle,
+							   name, strlen(name),
+							   value, value_len);
+}
+
+static void corosync_stats_increment_value (hdb_handle_t handle,
+			const char* name)
+{
+	unsigned int key_incr_dummy;
+	objdb->object_key_increment (handle,
+								 name, strlen(name),
+								 &key_incr_dummy);
+}
+
 static struct coroipcs_init_state ipc_init_state = {
 	.socket_name			= COROSYNC_SOCKET_NAME,
 	.sched_policy			= SCHED_OTHER,
@@ -691,6 +792,12 @@
 	.exit_fn_get			= corosync_exit_fn_get,
 	.handler_fn_get			= corosync_handler_fn_get
 };
+static struct coroipcs_init_stats_state ipc_init_stats_state = {
+	.stats_create_connection	= corosync_stats_create_connection,
+	.stats_destroy_connection	= corosync_stats_destroy_connection,
+	.stats_update_value			= corosync_stats_update_value,
+	.stats_increment_value		= corosync_stats_increment_value
+};
 
 static void corosync_setscheduler (void)
 {
@@ -734,6 +841,37 @@
 #endif
 }
 
+static void corosync_stats_init (void)
+{
+	hdb_handle_t object_find_handle;
+	hdb_handle_t object_runtime_handle;
+	uint32_t zero_32 = 0;
+
+	objdb->object_find_create (
+		OBJECT_PARENT_HANDLE,
+		"runtime",
+		strlen ("runtime"),
+		&object_find_handle);
+
+	if (objdb->object_find_next (
+			object_find_handle,
+			&object_runtime_handle) != 0) {
+		return;
+	}
+	/* Connection objects */
+	objdb->object_create (object_runtime_handle,
+						  &object_connection_handle,
+						  "connections",
+						  strlen ("connections"));
+	objdb->object_key_create_typed (object_connection_handle,
+								  "active", &zero_32, sizeof (zero_32),
+								  OBJDB_VALUETYPE_UINT32);
+	objdb->object_key_create_typed (object_connection_handle,
+								  "closed", &zero_32, sizeof (zero_32),
+								  OBJDB_VALUETYPE_UINT32);
+}
+
+
 static void main_service_ready (void)
 {
 	int res;
@@ -746,6 +884,7 @@
 		corosync_exit_error (AIS_DONE_INIT_SERVICES);
 	}
 	evil_init (api);
+	corosync_stats_init ();
 }
 
 int main (int argc, char **argv)
@@ -765,6 +904,7 @@
 	int background, setprio;
 	struct stat stat_out;
 	char corosync_lib_dir[PATH_MAX];
+	hdb_handle_t object_runtime_handle;
 
 #if defined(HAVE_PTHREAD_SPIN_LOCK)
 	pthread_spin_init (&serialize_spin, 0);
@@ -977,6 +1117,11 @@
 		corosync_exit_error (AIS_DONE_MAINCONFIGREAD);
 	}
 
+	/* create the main runtime object */
+	objdb->object_create (OBJECT_PARENT_HANDLE,
+			&object_runtime_handle,
+			"runtime", strlen ("runtime"));
+
 	/*
 	 * Now we are fully initialized.
 	 */
@@ -1062,6 +1207,7 @@
 	}
 
 	coroipcs_ipc_init (&ipc_init_state);
+	coroipcs_ipc_stats_init (&ipc_init_stats_state);
 
 	/*
 	 * Start main processing loop



More information about the Openais mailing list