[Openais] Re: just looked at some of the openais code, have some comments

Steven Dake sdake at mvista.com
Tue Jun 29 12:33:03 PDT 2004


Chris,

Find attached a copy of a patch that uses reference counting for
tracking users of a handle.

The ugly locking inside the saHandleCovert (now saHandlePut) is gone and
the code looks much cleaner.

Also, it is now possible to use an api call within a callback.

I have tested that dispatch operates correctly when a Finalize call is
called and all of the APIs seem to operate correctly.

Some design notes:
1. saHandleCreate set the reference count to 1 (this is called by the
initialize functions of each API)
2. saHandleGet will not allow a handle to be taken if the reference
count is 0 even if it is valid in the database
3. The finalize does a get to get the instance data, a put to release
the reference to the instance data, a put to release the reference from
the initialize call (so if there are no users, it is now 0), and finally
a wait for refcount to go to zero before deleting.

#3 is a little mystical I think some comments can probably help, but the
code is less mystical then it was :)

If you have time and can take a look before I commit to bk, I'd
appreciate it.

Thanks!
-steve

On Tue, 2004-06-29 at 10:24, Chris Friesen wrote:
> Steven Dake wrote:
> > I am likely to make a
> > portability library of things like threads/condition variables/reference
> > counting for the library.  That library could conditionally noop the
> > code for threads if necessary.
> 
> Sounds good.
> 
> > I feel a little uncomfortable, as a library, changing signal handlers.
> > The SA Forum AIS WG is in process of specifying what exactly happens in
> > a signal handler (should APIs be allowed or not).  I'll take this input
> > to the specs group.
> 
> Yeah.  It's always a bit of a dilemma as to how to deal with signals.  You could 
> just put a stake in the ground and say that no library functions are async-safe, 
> so if the user wants to use them in signal handlers they would just have to 
> block signals around all library calls.
> 
> > You mean add a flag to saClmDispatch?  The downside of that approach is
> > that the API then becomes nonstandard for a small performance payoff.
> 
> Yes, that's what I had in mind. However, I had forgotten that there were 
> standards that you were coding to.
> 
> > I'd like to match the AIS standard as much as possible, even when it
> > means performance won't be right.  The longer-term approach to this
> > problem is to drive that kind of feedback into the specs group which
> > I'll do.
> 
> Cool.  That'd be great.
> 
> > I see the light now...  Refcounting is the way to go, especially for
> > handling dispatch and finalize calls.  Then the mutex wont have to be
> > held across callbacks and a bunch of other problems go away too.
> 
> I'm glad you like the refcount idea.  I don't know how familiar you are with the 
> kernel code, but they use refcounts all over the place.
> 
> I've been looking at the code a bit more and have a few other comments:
> 
> 1) In the test programs, you've got select() being called in while loops.  When 
> select() returns, the fd sets will be modified to reflect what fds had events 
> occur.  If you get a signal, you can exit select() with empty fd sets, and you 
> will no longer monitor anything.  You need to either copy the fd set to a 
> working one each loop, or else recreate the fd set each time through the loop.
> 
> 2) There's a small niggle in testamf.c, HealthcheckCallback().  The print 
> statement will run every time since the loop is commented out, but it prints 
> that 20 checks have occurred.
> 
> 3) It might be nice to be able to set the cluster name and node name.
> 
> 4) The magical version of
> 
>   SaVersionT version = { 'A', 1, 1 };
> 
> is a bit confusing.  I wonder if it could be made a constant or something rather 
> than having to be explicitly typed out by the user?
> 
> Chris
> 
> 
-------------- next part --------------
--- openais-cond-orig/lib/amf.c	2004-06-28 18:30:33.000000000 -0700
+++ openais-cond/lib/amf.c	2004-06-28 17:01:46.000000000 -0700
@@ -33,25 +33,6 @@
  * THE POSSIBILITY OF SUCH DAMAGE.
  */
 
-/*
- * thread locking model is as follows
- *
- *	APIs that use handles:
- *
- *	Every handle database has a lock.
- *	Each interface started with SaAmfInitialize has a lock.
- *	Handle database lock is taken.
- *	amfInstance lock is taken.
- *	Handle database lock is released early.
- *	amfInstance lock is released after amfInstance is out of use.
- *
- *	Finalize API:
- *	Handle database lock is taken
- *	amf instance lock is taken
- *	handle is removed
- *	amf instance lock is released
- *	handle database lock is released
- */
 #include <stdio.h>
 #include <string.h>
 #include <stdlib.h>
@@ -83,10 +64,11 @@
 	struct queue inq;
 	SaNameT compName;
 	int compRegistered;
-	struct message_overlay message;
+	int finalize;
 	pthread_mutex_t mutex;
+	struct saRefCount refCount;
 };
-#define AMFINSTANCE_MUTEX_OFFSET offset_of(struct amfInstance, mutex)
+#define AMFINSTANCE_REFCOUNT_OFFSET offset_of(struct amfInstance, refCount)
 
 /*
  * All instances in one database
@@ -94,8 +76,8 @@
 static struct saHandleDatabase amfHandleDatabase = {
 	handleCount: 0,
 	handles: 0,
-	generation: 0,
-	mutex: PTHREAD_MUTEX_INITIALIZER
+	mutex: PTHREAD_MUTEX_INITIALIZER,
+	offsetToRefCount: AMFINSTANCE_REFCOUNT_OFFSET
 };
 
 /*
@@ -170,14 +152,14 @@
 	struct amfInstance *amfInstance;
 	SaErrorT error;
 
-	error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
 
 	*selectionObject = amfInstance->fd;
 
-	pthread_mutex_unlock (&amfInstance->mutex);
+	saHandlePut (&amfHandleDatabase, *amfHandle);
 	return (SA_OK);
 }
 
@@ -202,12 +184,15 @@
 	int empty;
 	int ignore_dispatch = 0;
 	int cont = 1; /* always continue do loop except when set to 0 */
-	int handle_verified = 0;
 	int poll_fd;
-	unsigned int gen_first;
-	unsigned int gen_second;
 	struct message_overlay dispatch_data;
 
+	error = saHandleGet (&amfHandleDatabase, *amfHandle,
+		(void *)&amfInstance);
+	if (error != SA_OK) {
+		return (error);
+	}
+
 	/*
 	 * Timeout instantly for SA_DISPATCH_ALL
 	 */
@@ -216,26 +201,9 @@
 	}
 
 	do {
-		/*
-		 * If flags are SA_DISPATCH_BLOCKING and handle has been
-		 * verified, return SA_OK because a Finalize has been
-		 * called.  Else return error from saHandleConvert
-		 */
-		error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, &gen_first);
-		if (error != SA_OK) {
-			return (handle_verified ? SA_OK : error);
-		}
-		handle_verified = 1;
-
 		poll_fd = amfInstance->fd;
 
 		/*
-		 * Unlock mutex for potentially long wait in select.  If fd
-		* is closed by amfFinalize in select, select will return
-		 */
-		pthread_mutex_unlock (&amfInstance->mutex);
-		
-		/*
 		 * Read data directly from socket
 		 */
 		ufds.fd = poll_fd;
@@ -247,28 +215,27 @@
 			goto error_nounlock;
 		}
 
+		pthread_mutex_lock (&amfInstance->mutex);
+
+		/*
+		 * Handle has been finalized in another thread
+		 */
+		if (amfInstance->finalize == 1) {
+			error = SA_OK;
+			pthread_mutex_unlock (&amfInstance->mutex);
+			goto error_unlock;
+		}
+
 		dispatch_avail = ufds.revents & POLLIN;
 		if (dispatch_avail == 0 && dispatchFlags == SA_DISPATCH_ALL) {
+			pthread_mutex_unlock (&amfInstance->mutex);
 			break; /* exit do while cont is 1 loop */
 		} else
 		if (dispatch_avail == 0) {
-			continue; /* next select */
-		}
-
-		/*
-		 * Re-verify amfHandle
-		 */
-		error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, &gen_second);
-		if (error != SA_OK) {
-			return (handle_verified ? SA_OK : error);
+			pthread_mutex_unlock (&amfInstance->mutex);
+			continue; /* next poll */
 		}
 
-		/*
-		 * Handle has been removed and then reallocated
-		 */
-		if (gen_first != gen_second) {
-			return (SA_OK);
-		}
 		saQueueIsEmpty(&amfInstance->inq, &empty);
 		if (empty == 0) {
 			/*
@@ -276,19 +243,21 @@
 			 */
 			saQueueItemGet (&amfInstance->inq, (void *)&queue_msg);
 			msg = *queue_msg;
-			memcpy (&amfInstance->message, msg, msg->size);
+			memcpy (&dispatch_data, msg, msg->size);
 			saQueueItemRemove (&amfInstance->inq);
 		} else {
 			/*
 			 * Queue empty, read response from socket
 			 */
-			error = saRecvRetry (amfInstance->fd, &amfInstance->message.header, sizeof (struct message_header), MSG_WAITALL | MSG_NOSIGNAL);
+			error = saRecvRetry (amfInstance->fd, &dispatch_data.header,
+				sizeof (struct message_header), MSG_WAITALL | MSG_NOSIGNAL);
 			if (error != SA_OK) {
 				goto error_unlock;
 			}
-			if (amfInstance->message.header.size > sizeof (struct message_header)) {
-				error = saRecvRetry (amfInstance->fd, &amfInstance->message.data,
-					amfInstance->message.header.size - sizeof (struct message_header), MSG_WAITALL | MSG_NOSIGNAL);
+			if (dispatch_data.header.size > sizeof (struct message_header)) {
+				error = saRecvRetry (amfInstance->fd, &dispatch_data.data,
+					dispatch_data.header.size - sizeof (struct message_header),
+					MSG_WAITALL | MSG_NOSIGNAL);
 				if (error != SA_OK) {
 					goto error_unlock;
 				}
@@ -300,14 +269,13 @@
 		 * operate at the same time that amfFinalize has been called in another thread.
 		 */
 		memcpy (&callbacks, &amfInstance->callbacks, sizeof (SaAmfCallbacksT));
-		memcpy (&dispatch_data, &amfInstance->message, sizeof (struct message_overlay));
-
 		pthread_mutex_unlock (&amfInstance->mutex);
 
+
 		/*
 		 * Dispatch incoming response
 		 */
-		switch (amfInstance->message.header.id) {
+		switch (dispatch_data.header.id) {
 		case MESSAGE_RES_AMF_ACTIVATEPOLL:
 			/*
 			 * This is a do nothing message which the node executive sends
@@ -398,10 +366,8 @@
 		}
 	} while (cont);
 
-	return (error);
-
 error_unlock:
-	pthread_mutex_unlock (&amfInstance->mutex);
+	saHandlePut (&amfHandleDatabase, *amfHandle);
 error_nounlock:
 	return (error);
 }
@@ -413,21 +379,27 @@
 	struct amfInstance *amfInstance;
 	SaErrorT error;
 
-	error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET | HANDLECONVERT_DONTUNLOCKDB, 0);
+	error = saHandleGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
 
 	shutdown (amfInstance->fd, 0);
+
 	close (amfInstance->fd);
+
 	free (amfInstance->inq.items);
 
-	error = saHandleRemove (&amfHandleDatabase, *amfHandle);
+	amfInstance->finalize = 1;
 
-	pthread_mutex_unlock (&amfInstance->mutex);
+	saHandlePut (&amfHandleDatabase, *amfHandle);
+
+	saHandlePut (&amfHandleDatabase, *amfHandle);
 
-	saHandleUnlockDatabase (&amfHandleDatabase);
+	saHandleWaitNoRefs (&amfHandleDatabase, *amfHandle);
 
+	error = saHandleRemove (&amfHandleDatabase, *amfHandle);
+	
 	return (error);
 }
 
@@ -441,6 +413,7 @@
 	SaErrorT error;
 	struct req_lib_amf_componentregister req_lib_amf_componentregister;
 	struct res_lib_amf_componentregister *res_lib_amf_componentregister;
+	struct message_overlay message;
 
 	req_lib_amf_componentregister.header.magic = MESSAGE_MAGIC;
 	req_lib_amf_componentregister.header.size = sizeof (struct req_lib_amf_componentregister);
@@ -452,11 +425,13 @@
 		memset (&req_lib_amf_componentregister.proxyCompName, 0, sizeof (SaNameT));
 	}
 
-	error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
 
+	pthread_mutex_lock (&amfInstance->mutex);
+
 	error = saSendRetry (amfInstance->fd, &req_lib_amf_componentregister, sizeof (struct req_lib_amf_componentregister), MSG_NOSIGNAL);
 	if (error != SA_OK) {
 		goto error_unlock;
@@ -468,13 +443,13 @@
 	 * This must be done to avoid dropping async messages
 	 * during this sync message retrieval
 	 */
-	error = saRecvQueue (amfInstance->fd, &amfInstance->message,
+	error = saRecvQueue (amfInstance->fd, &message,
 		&amfInstance->inq, MESSAGE_RES_AMF_COMPONENTREGISTER);
 	if (error != SA_OK) {
 		goto error_unlock;
 	}
 
-	res_lib_amf_componentregister = (struct res_lib_amf_componentregister *)&amfInstance->message;
+	res_lib_amf_componentregister = (struct res_lib_amf_componentregister *)&message;
 	if (res_lib_amf_componentregister->error == SA_OK) {
 		amfInstance->compRegistered = 1;
 		memcpy (&amfInstance->compName, compName, sizeof (SaNameT));
@@ -484,6 +459,7 @@
 
 error_unlock:
 	pthread_mutex_unlock (&amfInstance->mutex);
+	saHandlePut (&amfHandleDatabase, *amfHandle);
 	return (error);
 }
 
@@ -496,6 +472,7 @@
 	struct req_lib_amf_componentunregister req_lib_amf_componentunregister;
 	struct res_lib_amf_componentunregister *res_lib_amf_componentunregister;
 	struct amfInstance *amfInstance;
+	struct message_overlay message;
 	SaErrorT error;
 
 	req_lib_amf_componentunregister.header.magic = MESSAGE_MAGIC;
@@ -508,11 +485,13 @@
 		memset (&req_lib_amf_componentunregister.proxyCompName, 0, sizeof (SaNameT));
 	}	
 
-	error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
 
+	pthread_mutex_lock (&amfInstance->mutex);
+
 	error = saSendRetry (amfInstance->fd, &req_lib_amf_componentunregister,
 		sizeof (struct req_lib_amf_componentunregister), MSG_NOSIGNAL);
 	if (error != SA_OK) {
@@ -525,12 +504,13 @@
 	 * This must be done to avoid dropping async messages
 	 * during this sync message retrieval
 	 */
-	error = saRecvQueue (amfInstance->fd, &amfInstance->message,
+	error = saRecvQueue (amfInstance->fd, &message,
 		&amfInstance->inq, MESSAGE_RES_AMF_COMPONENTUNREGISTER);
 	if (error != SA_OK) {
 		goto error_unlock;
 	}
-	res_lib_amf_componentunregister = (struct res_lib_amf_componentunregister *)&amfInstance->message;
+
+	res_lib_amf_componentunregister = (struct res_lib_amf_componentunregister *)&message;
 	if (res_lib_amf_componentunregister->error == SA_OK) {
 		amfInstance->compRegistered = 0;
 	}
@@ -538,6 +518,7 @@
 
 error_unlock:
 	pthread_mutex_unlock (&amfInstance->mutex);
+	saHandlePut (&amfHandleDatabase, *amfHandle);
 	return (error);
 }
 
@@ -549,7 +530,7 @@
 	struct amfInstance *amfInstance;
 	SaErrorT error;
 
-	error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
@@ -559,7 +540,7 @@
 	}
 	memcpy (compName, &amfInstance->compName, sizeof (SaNameT));
 
-	pthread_mutex_unlock (&amfInstance->mutex);
+	saHandlePut (&amfHandleDatabase, *amfHandle);
 	return (SA_OK);
 }
 
@@ -686,6 +667,7 @@
 	struct amfInstance *amfInstance;
 	struct req_amf_protectiongrouptrackstart req_amf_protectiongrouptrackstart;
 	struct res_amf_protectiongrouptrackstart *res_amf_protectiongrouptrackstart;
+	struct message_overlay message;
 	SaErrorT error;
 
 	req_amf_protectiongrouptrackstart.header.magic = MESSAGE_MAGIC;
@@ -696,33 +678,29 @@
 	req_amf_protectiongrouptrackstart.notificationBufferAddress = (SaAmfProtectionGroupNotificationT *)notificationBuffer;
 	req_amf_protectiongrouptrackstart.numberOfItems = numberOfItems;
 
-	error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
 
+	pthread_mutex_lock (&amfInstance->mutex);
+
 	error = saSendRetry (amfInstance->fd, &req_amf_protectiongrouptrackstart,
 		sizeof (struct req_amf_protectiongrouptrackstart), MSG_NOSIGNAL);
 	if (error != SA_OK) {
 		goto error_unlock;
 	}
 
-	error = saRecvQueue (amfInstance->fd, &amfInstance->message,
+	error = saRecvQueue (amfInstance->fd, &message,
 		&amfInstance->inq, MESSAGE_RES_AMF_PROTECTIONGROUPTRACKSTART);
 
-	pthread_mutex_unlock (&amfInstance->mutex);
-
-	res_amf_protectiongrouptrackstart = (struct res_amf_protectiongrouptrackstart *)&amfInstance->message;
+	res_amf_protectiongrouptrackstart = (struct res_amf_protectiongrouptrackstart *)&message;
 
-	if (error == SA_OK) {
-		return (res_amf_protectiongrouptrackstart->error);
-	}
-
-	return (error);
+	error = res_amf_protectiongrouptrackstart->error;
 
 error_unlock:
 	pthread_mutex_unlock (&amfInstance->mutex);
-
+	saHandlePut (&amfHandleDatabase, *amfHandle);
 	return (error);
 }
 
@@ -734,6 +712,7 @@
 	struct amfInstance *amfInstance;
 	struct req_amf_protectiongrouptrackstop req_amf_protectiongrouptrackstop;
 	struct res_amf_protectiongrouptrackstop *res_amf_protectiongrouptrackstop;
+	struct message_overlay message;
 	SaErrorT error;
 
 	req_amf_protectiongrouptrackstop.header.magic = MESSAGE_MAGIC;
@@ -741,32 +720,31 @@
 	req_amf_protectiongrouptrackstop.header.id = MESSAGE_REQ_AMF_PROTECTIONGROUPTRACKSTOP;
 	memcpy (&req_amf_protectiongrouptrackstop.csiName, csiName, sizeof (SaNameT));
 
-	error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
 
+	pthread_mutex_lock (&amfInstance->mutex);
+
 	error = saSendRetry (amfInstance->fd, &req_amf_protectiongrouptrackstop,
 		sizeof (struct req_amf_protectiongrouptrackstop), MSG_NOSIGNAL);
 	if (error != SA_OK) {
 		goto error_unlock;
 	}
 
-	error = saRecvQueue (amfInstance->fd, &amfInstance->message,
+	error = saRecvQueue (amfInstance->fd, &message,
 		&amfInstance->inq, MESSAGE_RES_AMF_PROTECTIONGROUPTRACKSTOP);
 
-	pthread_mutex_unlock (&amfInstance->mutex);
-
-	res_amf_protectiongrouptrackstop = (struct res_amf_protectiongrouptrackstop *)&amfInstance->message;
+	saHandlePut (&amfHandleDatabase, *amfHandle);
 
-	if (error == SA_OK) {
-		return (res_amf_protectiongrouptrackstop->error);
-	}
+	res_amf_protectiongrouptrackstop = (struct res_amf_protectiongrouptrackstop *)&message;
 
-	return (error);
+	error = res_amf_protectiongrouptrackstop->error;
 
 error_unlock:
 	pthread_mutex_unlock (&amfInstance->mutex);
+	saHandlePut (&amfHandleDatabase, *amfHandle);
 	return (error);
 }
 
--- openais-cond-orig/lib/ckpt.c	2004-06-28 18:30:33.000000000 -0700
+++ openais-cond/lib/ckpt.c	2004-06-28 18:43:54.000000000 -0700
@@ -62,58 +62,62 @@
 	int fd;
 	struct queue inq;
 	SaCkptCallbacksT callbacks;
+	int finalize;
 	pthread_mutex_t mutex;
+	struct saRefCount refCount;
 };
-#define CKPTINSTANCE_MUTEX_OFFSET HANDLECONVERT_NOLOCKING
+#define CKPTINSTANCE_REFCOUNT_OFFSET offset_of(struct ckptInstance, refCount)
 
 struct ckptCheckpointInstance {
 	int fd;
 	SaNameT checkpointName;
 	SaUint32T maxSectionIdSize;
+	int finalize;
 	pthread_mutex_t mutex;
+	struct saRefCount refCount;
 };
-//#define CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET offset_of(struct ckptCheckpointInstance, mutex)
-#define CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET_DEMO offset_of(struct ckptCheckpointInstance, mutex)
-#define CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET HANDLECONVERT_NOLOCKING
+#define CKPTCHECKPOINTINSTANCE_REFCOUNT_OFFSET offset_of(struct ckptCheckpointInstance, refCount)
 
 struct ckptSectionIteratorInstance {
 	int fd;
 	struct list_head sectionIdListHead;
 	SaUint32T maxSectionIdSize;
+	int finalize;
 	pthread_mutex_t mutex;
+	struct saRefCount refCount;
 };
 
-//#define CKPTSECTIONITERATORINSTANCE_MUTEX_OFFSET offset_of(struct ckptSectionIteratorInstance, mutex)
-#define CKPTSECTIONITERATORINSTANCE_MUTEX_OFFSET HANDLECONVERT_NOLOCKING
+#define CKPTSECTIONITERATORINSTANCE_REFCOUNT_OFFSET offset_of(struct ckptSectionIteratorInstance, refCount)
 
 /*
  * All CKPT instances in this database
  */
 static struct saHandleDatabase ckptHandleDatabase = {
-    handleCount: 0,
-    handles: 0,
-    generation: 0,
-    mutex: PTHREAD_MUTEX_INITIALIZER
+	handleCount: 0,
+	handles: 0,
+	mutex: PTHREAD_MUTEX_INITIALIZER,
+	offsetToRefCount: CKPTINSTANCE_REFCOUNT_OFFSET
+
 };
 
 /*
  *  All Checkpoint instances in this database
  */
 static struct saHandleDatabase ckptCheckpointHandleDatabase = {
-    handleCount: 0,
-    handles: 0,
-    generation: 0,
-    mutex: PTHREAD_MUTEX_INITIALIZER
+	handleCount: 0,
+	handles: 0,
+	mutex: PTHREAD_MUTEX_INITIALIZER,
+	offsetToRefCount: CKPTCHECKPOINTINSTANCE_REFCOUNT_OFFSET
 };
 
 /*
  * All section iterators in this database
  */
 static struct saHandleDatabase ckptSectionIteratorHandleDatabase = {
-    handleCount: 0,
-    handles: 0,
-    generation: 0,
-    mutex: PTHREAD_MUTEX_INITIALIZER
+	handleCount: 0,
+	handles: 0,
+	mutex: PTHREAD_MUTEX_INITIALIZER,
+	offsetToRefCount: CKPTSECTIONITERATORINSTANCE_REFCOUNT_OFFSET
 };
 
 /*
@@ -192,12 +196,15 @@
 	struct ckptInstance *ckptInstance;
 	SaErrorT error;
 
-	error = saHandleConvert (&ckptHandleDatabase, *ckptHandle, (void *)&ckptInstance, CKPTINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&ckptHandleDatabase, *ckptHandle, (void *)&ckptInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
 
 	*selectionObject = ckptInstance->fd;
+
+	saHandlePut (&ckptHandleDatabase, *ckptHandle);
+
 	return (SA_OK);
 }
 
@@ -218,7 +225,7 @@
 	int ignore_dispatch = 0;
 	int cont = 1; /* always continue do loop except when set to 0 */
 
-	error = saHandleConvert (&ckptHandleDatabase, *ckptHandle, (void *)&ckptInstance, CKPTINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&ckptHandleDatabase, *ckptHandle, (void *)&ckptInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
@@ -382,14 +389,27 @@
 	struct ckptInstance *ckptInstance;
 	SaErrorT error;
 
-	error = saHandleConvert (&ckptHandleDatabase, *ckptHandle, (void *)&ckptInstance, CKPTINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&ckptHandleDatabase, *ckptHandle, (void *)&ckptInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
 
+	shutdown (ckptInstance->fd, 0);
+
 	close (ckptInstance->fd);
+
 	free (ckptInstance->inq.items);
+
+	ckptInstance->finalize = 1;
+
+	saHandlePut (&ckptHandleDatabase, *ckptHandle);
+
+	saHandlePut (&ckptHandleDatabase, *ckptHandle);
+	
+	saHandleWaitNoRefs (&ckptHandleDatabase, *ckptHandle);
+
 	saHandleRemove (&ckptHandleDatabase, *ckptHandle);
+
 	return (SA_OK);
 }
 
@@ -420,8 +440,6 @@
 		goto error_free;
 	}
 
-	pthread_mutex_init (&ckptCheckpointInstance->mutex, NULL);
-
 	req_lib_ckpt_checkpointopen.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_checkpointopen.header.size = sizeof (struct req_lib_ckpt_checkpointopen);
 	req_lib_ckpt_checkpointopen.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTOPEN;
@@ -472,7 +490,7 @@
 	SaErrorT error;
 	struct req_lib_ckpt_checkpointopenasync req_lib_ckpt_checkpointopenasync;
 
-	error = saHandleConvert (&ckptHandleDatabase, *ckptHandle, (void *)&ckptInstance, CKPTINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&ckptHandleDatabase, *ckptHandle, (void *)&ckptInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
@@ -488,9 +506,15 @@
 	
 	req_lib_ckpt_checkpointopenasync.checkpointOpenFlags = checkpointOpenFlags;
 
+	pthread_mutex_lock (&ckptInstance->mutex);
+
         error = saSendRetry (ckptInstance->fd, &req_lib_ckpt_checkpointopenasync,
 		sizeof (struct req_lib_ckpt_checkpointopenasync), MSG_NOSIGNAL);
 
+	pthread_mutex_unlock (&ckptInstance->mutex);
+
+	saHandlePut (&ckptHandleDatabase, *ckptHandle);
+
 	return (error);
 }
 
@@ -501,13 +525,24 @@
 	SaErrorT error;
 	struct ckptCheckpointInstance *ckptCheckpointInstance;
 
-	error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
-		(void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+		(void *)&ckptCheckpointInstance);
 	if (error != SA_OK) {
 		goto error_exit;
 	}
 
+	shutdown (ckptCheckpointInstance->fd, 0);
+
 	close (ckptCheckpointInstance->fd);
+
+	ckptCheckpointInstance->finalize = 1;
+
+	saHandlePut (&ckptCheckpointHandleDatabase, *checkpointHandle);
+
+	saHandlePut (&ckptCheckpointHandleDatabase, *checkpointHandle);
+
+	saHandleWaitNoRefs (&ckptCheckpointHandleDatabase, *checkpointHandle);
+
 	error = saHandleRemove (&ckptCheckpointHandleDatabase, *checkpointHandle);
 
 error_exit:
@@ -558,7 +593,8 @@
 	struct ckptCheckpointInstance *ckptCheckpointInstance;
 	struct req_lib_ckpt_checkpointretentiondurationset req_lib_ckpt_checkpointretentiondurationset;
 
-	error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle, (void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+		(void *)&ckptCheckpointInstance);
 	if (error != SA_OK) {
 		goto error_exit;
 	}
@@ -567,10 +603,13 @@
 	req_lib_ckpt_checkpointretentiondurationset.header.size = sizeof (struct req_lib_ckpt_checkpointretentiondurationset);
 	req_lib_ckpt_checkpointretentiondurationset.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTRETENTIONDURATIONSET;
 
+	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
 	error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_checkpointretentiondurationset, sizeof (struct req_lib_ckpt_checkpointretentiondurationset), MSG_NOSIGNAL);
-	if (error != SA_OK) {
-		goto error_exit;
-	}
+
+	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+
+	saHandlePut (&ckptCheckpointHandleDatabase, *checkpointHandle);
 
 error_exit:
 	return (error);
@@ -585,7 +624,8 @@
 	struct req_lib_ckpt_activecheckpointset req_lib_ckpt_activecheckpointset;
 	struct res_lib_ckpt_activecheckpointset res_lib_ckpt_activecheckpointset;
 
-	error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle, (void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+		 (void *)&ckptCheckpointInstance);
 	if (error != SA_OK) {
 		goto error_exit;
 	}
@@ -594,13 +634,21 @@
 	req_lib_ckpt_activecheckpointset.header.size = sizeof (struct req_lib_ckpt_activecheckpointset);
 	req_lib_ckpt_activecheckpointset.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_ACTIVECHECKPOINTSET;
 
+	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
 	error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_activecheckpointset,
 		sizeof (struct req_lib_ckpt_activecheckpointset), MSG_NOSIGNAL);
 	if (error != SA_OK) {
 		goto error_exit;
 	}
 
-	error = saRecvQueue (ckptCheckpointInstance->fd, &res_lib_ckpt_activecheckpointset, 0, MESSAGE_RES_CKPT_CHECKPOINT_ACTIVECHECKPOINTSET);
+	error = saRecvQueue (ckptCheckpointInstance->fd,
+		&res_lib_ckpt_activecheckpointset, 0,
+		MESSAGE_RES_CKPT_CHECKPOINT_ACTIVECHECKPOINTSET);
+
+	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+
+	saHandlePut (&ckptCheckpointHandleDatabase, *checkpointHandle);
 
 error_exit:
 	return (error == SA_OK ? res_lib_ckpt_activecheckpointset.error : error);
@@ -616,16 +664,18 @@
 	struct req_lib_ckpt_checkpointstatusget req_lib_ckpt_checkpointstatusget;
 	struct res_lib_ckpt_checkpointstatusget res_lib_ckpt_checkpointstatusget;
 
-	error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
-		(void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+		(void *)&ckptCheckpointInstance);
 	if (error != SA_OK) {
-		goto error_exit;
+		return (error);
 	}
 
 	req_lib_ckpt_checkpointstatusget.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_checkpointstatusget.header.size = sizeof (struct req_lib_ckpt_checkpointstatusget);
 	req_lib_ckpt_checkpointstatusget.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTSTATUSGET;
 
+	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
 	error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_checkpointstatusget,
 		sizeof (struct req_lib_ckpt_checkpointstatusget), MSG_NOSIGNAL);
 	if (error != SA_OK) {
@@ -638,11 +688,14 @@
 		goto error_exit;
 	}
 
+	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+
 	memcpy (checkpointStatus,
 		&res_lib_ckpt_checkpointstatusget.checkpointStatus,
 		sizeof (SaCkptCheckpointStatusT));
 
 error_exit:
+	saHandlePut (&ckptCheckpointHandleDatabase, *checkpointHandle);
 	return (error);
 }
 
@@ -658,10 +711,10 @@
 	struct req_lib_ckpt_sectioncreate req_lib_ckpt_sectioncreate;
 	struct res_lib_ckpt_sectioncreate res_lib_ckpt_sectioncreate;
 
-	error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
-		(void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+		(void *)&ckptCheckpointInstance);
 	if (error != SA_OK) {
-		goto error_exit;
+		return (error);
 	}
 
 	req_lib_ckpt_sectioncreate.header.magic = MESSAGE_MAGIC;
@@ -677,6 +730,8 @@
 	req_lib_ckpt_sectioncreate.initialDataSize = initialDataSize;
 
 
+	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
 	error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_sectioncreate,
 		sizeof (struct req_lib_ckpt_sectioncreate), MSG_NOSIGNAL);
 	if (error != SA_OK) {
@@ -698,9 +753,14 @@
 		goto error_exit;
 	}
 
-	error = saRecvQueue (ckptCheckpointInstance->fd, &res_lib_ckpt_sectioncreate, 0, MESSAGE_RES_CKPT_CHECKPOINT_SECTIONCREATE);
+	error = saRecvQueue (ckptCheckpointInstance->fd,
+		&res_lib_ckpt_sectioncreate, 0,
+		MESSAGE_RES_CKPT_CHECKPOINT_SECTIONCREATE);
+
+	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
 
 error_exit:
+	saHandlePut (&ckptCheckpointHandleDatabase, *checkpointHandle);
 	return (error == SA_OK ? res_lib_ckpt_sectioncreate.error : error);
 }
 
@@ -715,12 +775,14 @@
 	struct req_lib_ckpt_sectiondelete req_lib_ckpt_sectiondelete;
 	struct res_lib_ckpt_sectiondelete res_lib_ckpt_sectiondelete;
 
-	error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
-		(void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+		(void *)&ckptCheckpointInstance);
 	if (error != SA_OK) {
-		goto error_exit;
+		return (error);
 	}
 
+	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
 	req_lib_ckpt_sectiondelete.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_sectiondelete.header.size = sizeof (struct req_lib_ckpt_sectiondelete) + sectionId->idLen; 
 	req_lib_ckpt_sectiondelete.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONDELETE;
@@ -740,9 +802,14 @@
 	if (error != SA_OK) {
 		goto error_exit;
 	}
-	error = saRecvQueue (ckptCheckpointInstance->fd, &res_lib_ckpt_sectiondelete, 0, MESSAGE_RES_CKPT_CHECKPOINT_SECTIONDELETE);
+	error = saRecvQueue (ckptCheckpointInstance->fd,
+		&res_lib_ckpt_sectiondelete, 0,
+		MESSAGE_RES_CKPT_CHECKPOINT_SECTIONDELETE);
+
+	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
 
 error_exit:
+	saHandlePut (&ckptCheckpointHandleDatabase, *checkpointHandle);
 	return (error == SA_OK ? res_lib_ckpt_sectiondelete.error : error);
 }
 
@@ -757,8 +824,8 @@
 	struct req_lib_ckpt_sectionexpirationtimeset req_lib_ckpt_sectionexpirationtimeset;
 	struct res_lib_ckpt_sectionexpirationtimeset res_lib_ckpt_sectionexpirationtimeset;
 
-	error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
-		(void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+		(void *)&ckptCheckpointInstance);
 	if (error != SA_OK) {
 		goto error_exit;
 	}
@@ -769,6 +836,8 @@
 	req_lib_ckpt_sectionexpirationtimeset.idLen = sectionId->idLen;
 	req_lib_ckpt_sectionexpirationtimeset.expirationTime = expirationTime;
 
+	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
 	error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_sectionexpirationtimeset,
 		sizeof (struct req_lib_ckpt_sectionexpirationtimeset), MSG_NOSIGNAL);
 	if (error != SA_OK) {
@@ -786,10 +855,14 @@
 		}
 	}
 
-	error = saRecvQueue (ckptCheckpointInstance->fd, &res_lib_ckpt_sectionexpirationtimeset,
+	error = saRecvQueue (ckptCheckpointInstance->fd,
+		&res_lib_ckpt_sectionexpirationtimeset,
 		0, MESSAGE_RES_CKPT_CHECKPOINT_SECTIONEXPIRATIONTIMESET);
 
+	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+
 error_exit:
+	saHandlePut (&ckptCheckpointHandleDatabase, *checkpointHandle);
 	return (error == SA_OK ? res_lib_ckpt_sectionexpirationtimeset.error : error);
 }
 
@@ -806,8 +879,8 @@
 	struct req_lib_ckpt_sectioniteratorinitialize req_lib_ckpt_sectioniteratorinitialize;
 	struct res_lib_ckpt_sectioniteratorinitialize res_lib_ckpt_sectioniteratorinitialize;
 
-	error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
-		(void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+		(void *)&ckptCheckpointInstance);
 	if (error != SA_OK) {
 		goto error_exit;
 	}
@@ -818,6 +891,9 @@
 	if (error != SA_OK) {
 		goto error_exit;
 	}
+
+	pthread_mutex_init (&ckptSectionIteratorInstance->mutex, NULL);
+
 	/*
 	 * Setup section id list for iterator next
 	 */
@@ -832,8 +908,6 @@
 		goto error_remove;
 	}
 
-	pthread_mutex_init (&ckptSectionIteratorInstance->mutex, NULL);
-
 	req_lib_ckpt_sectioniteratorinitialize.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_sectioniteratorinitialize.header.size = sizeof (struct req_lib_ckpt_sectioniteratorinitialize); 
 	req_lib_ckpt_sectioniteratorinitialize.header.id = MESSAGE_REQ_CKPT_SECTIONITERATOR_SECTIONITERATORINITIALIZE;
@@ -842,8 +916,12 @@
 	memcpy (&req_lib_ckpt_sectioniteratorinitialize.checkpointName,
 		&ckptCheckpointInstance->checkpointName, sizeof (SaNameT));
 
+	pthread_mutex_lock (&ckptSectionIteratorInstance->mutex);
+
 	error = saSendRetry (ckptSectionIteratorInstance->fd,
-		&req_lib_ckpt_sectioniteratorinitialize, sizeof (struct req_lib_ckpt_sectioniteratorinitialize), MSG_NOSIGNAL);
+		&req_lib_ckpt_sectioniteratorinitialize,
+		sizeof (struct req_lib_ckpt_sectioniteratorinitialize),
+		MSG_NOSIGNAL);
 
 	if (error != SA_OK) {
 		goto error_close;
@@ -853,10 +931,17 @@
 		&res_lib_ckpt_sectioniteratorinitialize, 0,
 		MESSAGE_RES_CKPT_SECTIONITERATOR_SECTIONITERATORINITIALIZE);
 
+	pthread_mutex_unlock (&ckptSectionIteratorInstance->mutex);
+
 error_exit:
+	saHandlePut (&ckptCheckpointHandleDatabase, *checkpointHandle);
 	return (error == SA_OK ? res_lib_ckpt_sectioniteratorinitialize.error : error);
+
 error_close:
+	saHandlePut (&ckptCheckpointHandleDatabase, *checkpointHandle);
+
 	close (ckptSectionIteratorInstance->fd);
+
 error_remove:
 	saHandleRemove (&ckptSectionIteratorHandleDatabase, *sectionIterator);
 	return (error);
@@ -878,9 +963,8 @@
 	struct res_lib_ckpt_sectioniteratornext res_lib_ckpt_sectioniteratornext;
 	struct iteratorSectionIdListEntry *iteratorSectionIdListEntry;
 
-	error = saHandleConvert (&ckptSectionIteratorHandleDatabase, *sectionIterator,
-		(void *)&ckptSectionIteratorInstance,
-		CKPTSECTIONITERATORINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&ckptSectionIteratorHandleDatabase, *sectionIterator,
+		(void *)&ckptSectionIteratorInstance);
 	if (error != SA_OK) {
 		goto error_exit;
 	}
@@ -898,6 +982,8 @@
 	req_lib_ckpt_sectioniteratornext.header.size = sizeof (struct req_lib_ckpt_sectioniteratornext); 
 	req_lib_ckpt_sectioniteratornext.header.id = MESSAGE_REQ_CKPT_SECTIONITERATOR_SECTIONITERATORNEXT;
 
+	pthread_mutex_lock (&ckptSectionIteratorInstance->mutex);
+
 	error = saSendRetry (ckptSectionIteratorInstance->fd,
 		&req_lib_ckpt_sectioniteratornext,
 		sizeof (struct req_lib_ckpt_sectioniteratornext), MSG_NOSIGNAL);
@@ -934,7 +1020,9 @@
 		list_add (&iteratorSectionIdListEntry->list, &ckptSectionIteratorInstance->sectionIdListHead);
 	}
 
+	pthread_mutex_unlock (&ckptSectionIteratorInstance->mutex);
 error_exit:
+	saHandlePut (&ckptSectionIteratorHandleDatabase, *sectionIterator);
 	return (error == SA_OK ? res_lib_ckpt_sectioniteratornext.error : error);
 }
 	
@@ -948,9 +1036,8 @@
 	struct list_head *sectionIdIteratorList;
 	struct list_head *sectionIdIteratorListNext;
 
-	error = saHandleConvert (&ckptSectionIteratorHandleDatabase, *sectionIterator,
-		(void *)&ckptSectionIteratorInstance,
-		CKPTSECTIONITERATORINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&ckptSectionIteratorHandleDatabase, *sectionIterator,
+		(void *)&ckptSectionIteratorInstance);
 	if (error != SA_OK) {
 		goto error_exit;
 	}
@@ -973,6 +1060,15 @@
 		free (iteratorSectionIdListEntry);
 	}
 
+	saHandlePut (&ckptSectionIteratorHandleDatabase,
+		*sectionIterator);
+
+	saHandlePut (&ckptSectionIteratorHandleDatabase,
+		*sectionIterator);
+
+	saHandleWaitNoRefs (&ckptSectionIteratorHandleDatabase,
+		*sectionIterator);
+
 	saHandleRemove (&ckptSectionIteratorHandleDatabase, *sectionIterator);
 
 error_exit:
@@ -994,16 +1090,18 @@
 	struct iovec iov[3];
 	int iov_len = 0;
 
-	error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
-		(void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET_DEMO, 0);
+	error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+		(void *)&ckptCheckpointInstance);
 	if (error != SA_OK) {
-		goto error_exit;
+		return (error);
 	}
 
 	req_lib_ckpt_sectionwrite.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_sectionwrite.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONWRITE;
 
 
+	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
 	for (i = 0; i < numberOfElements; i++) {
 
 		req_lib_ckpt_sectionwrite.header.size = sizeof (struct req_lib_ckpt_sectionwrite) + ioVector[i].sectionId.idLen + ioVector[i].dataSize; 
@@ -1050,6 +1148,10 @@
 
 error_exit:
 	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+
+	saHandlePut (&ckptCheckpointHandleDatabase,
+		*checkpointHandle);
+
 	return (error == SA_OK ? res_lib_ckpt_sectionwrite.error : error);
 }
 
@@ -1065,10 +1167,10 @@
 	struct req_lib_ckpt_sectionoverwrite req_lib_ckpt_sectionoverwrite;
 	struct res_lib_ckpt_sectionoverwrite res_lib_ckpt_sectionoverwrite;
 
-	error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
-		(void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+		(void *)&ckptCheckpointInstance);
 	if (error != SA_OK) {
-		goto error_exit;
+		return (error);
 	}
 
 	req_lib_ckpt_sectionoverwrite.header.magic = MESSAGE_MAGIC;
@@ -1077,6 +1179,8 @@
 	req_lib_ckpt_sectionoverwrite.idLen = sectionId->idLen;
 	req_lib_ckpt_sectionoverwrite.dataSize = dataSize;
 	
+	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
 	error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_sectionoverwrite,
 		sizeof (struct req_lib_ckpt_sectionoverwrite), MSG_NOSIGNAL);
 	if (error != SA_OK) {
@@ -1099,6 +1203,11 @@
 		&res_lib_ckpt_sectionoverwrite, 0, MESSAGE_RES_CKPT_CHECKPOINT_SECTIONOVERWRITE);
 
 error_exit:
+	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+
+	saHandlePut (&ckptCheckpointHandleDatabase,
+		*checkpointHandle);
+
 	return (error == SA_OK ? res_lib_ckpt_sectionoverwrite.error : error);
 }
 
@@ -1117,15 +1226,17 @@
 	int i;
 	struct iovec iov[3];
 
-	error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
-		(void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET_DEMO, 0);
+	error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+		(void *)&ckptCheckpointInstance);
 	if (error != SA_OK) {
-		goto error_exit;
+		return (error);
 	}
 
 	req_lib_ckpt_sectionread.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_sectionread.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONREAD;
 
+	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
 	for (i = 0; i < numberOfElements; i++) {
 		req_lib_ckpt_sectionread.header.size = sizeof (struct req_lib_ckpt_sectionread) +
 			ioVector[i].sectionId.idLen;
@@ -1179,6 +1290,10 @@
 
 error_exit:
 	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+
+	saHandlePut (&ckptCheckpointHandleDatabase,
+		*checkpointHandle);
+
 	return (error == SA_OK ? res_lib_ckpt_sectionread.error : error);
 }
 
@@ -1192,16 +1307,18 @@
 	struct req_lib_ckpt_checkpointsynchronize req_lib_ckpt_checkpointsynchronize;
 	struct res_lib_ckpt_checkpointsynchronize res_lib_ckpt_checkpointsynchronize;
 
-	error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
-		(void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+		(void *)&ckptCheckpointInstance);
 	if (error != SA_OK) {
-		goto error_exit;
+		return (error);
 	}
 
 	req_lib_ckpt_checkpointsynchronize.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_checkpointsynchronize.header.size = sizeof (struct req_lib_ckpt_checkpointsynchronize); 
 	req_lib_ckpt_checkpointsynchronize.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZE;
 
+	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
 	error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_checkpointsynchronize,
 		sizeof (struct req_lib_ckpt_checkpointsynchronize), MSG_NOSIGNAL);
 
@@ -1213,6 +1330,11 @@
 		0, MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZE);
 
 error_exit:
+	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+
+	saHandlePut (&ckptCheckpointHandleDatabase,
+		*checkpointHandle);
+
 	return (error == SA_OK ? res_lib_ckpt_checkpointsynchronize.error : error);
 }
 
@@ -1227,15 +1349,18 @@
 	SaErrorT error;
 	struct req_lib_ckpt_checkpointsynchronizeasync req_lib_ckpt_checkpointsynchronizeasync;
 
-	error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
-		(void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+		(void *)&ckptCheckpointInstance);
 	if (error != SA_OK) {
-		goto error_exit;
+
+		return (error);
 	}
-	error = saHandleConvert (&ckptHandleDatabase, *ckptHandle,
-		(void *)&ckptInstance, CKPTINSTANCE_MUTEX_OFFSET, 0);
+
+	error = saHandleGet (&ckptHandleDatabase, *ckptHandle, (void *)&ckptInstance);
 	if (error != SA_OK) {
-		goto error_exit;
+		saHandlePut (&ckptCheckpointHandleDatabase,
+			*checkpointHandle);
+		return (error);
 	}
 
 	req_lib_ckpt_checkpointsynchronizeasync.header.magic = MESSAGE_MAGIC;
@@ -1243,9 +1368,19 @@
 	req_lib_ckpt_checkpointsynchronizeasync.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTOPENASYNC;
 	req_lib_ckpt_checkpointsynchronizeasync.invocation = invocation;
 
+	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
+	pthread_mutex_lock (&ckptInstance->mutex);
+
 	error = saSendRetry (ckptInstance->fd, &req_lib_ckpt_checkpointsynchronizeasync,
 		sizeof (struct req_lib_ckpt_checkpointsynchronizeasync), MSG_NOSIGNAL);
 
-error_exit:
+	pthread_mutex_unlock (&ckptInstance->mutex);
+
+	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+
+	saHandlePut (&ckptCheckpointHandleDatabase,
+		*checkpointHandle);
+
 	return (error);
 }
--- openais-cond-orig/lib/clm.c	2004-06-28 18:30:33.000000000 -0700
+++ openais-cond/lib/clm.c	2004-06-28 18:39:04.000000000 -0700
@@ -57,16 +57,18 @@
 struct clmInstance {
 	int fd;
 	SaClmCallbacksT callbacks;
-	struct message_overlay message;
+	int finalize;
 	pthread_mutex_t mutex;
+	struct saRefCount refCount;
 };
-#define CLMINSTANCE_MUTEX_OFFSET offset_of(struct clmInstance, mutex)
+
+#define CLMINSTANCE_REFCOUNT_OFFSET offset_of(struct clmInstance, refCount)
 
 static struct saHandleDatabase clmHandleDatabase = {
 	handleCount: 0,
 	handles: 0,
-	generation: 0,
-	mutex: PTHREAD_MUTEX_INITIALIZER
+	mutex: PTHREAD_MUTEX_INITIALIZER,
+	offsetToRefCount: CLMINSTANCE_REFCOUNT_OFFSET
 };
 
 /*
@@ -128,14 +130,14 @@
 	struct clmInstance *clmInstance;
 	SaErrorT error;
 
-	error = saHandleConvert (&clmHandleDatabase, *clmHandle, (void *)&clmInstance, CLMINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&clmHandleDatabase, *clmHandle, (void *)&clmInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
 
 	*selectionObject = clmInstance->fd;
 
-	pthread_mutex_unlock (&clmInstance->mutex);
+	saHandlePut (&clmHandleDatabase, *clmHandle);
 	return (SA_OK);
 }
 
@@ -150,15 +152,17 @@
 	int cont = 1; /* always continue do loop except when set to 0 */
 	int dispatch_avail;
 	int poll_fd;
-	int handle_verified = 0;
 	struct clmInstance *clmInstance;
 	struct res_clm_trackcallback *res_clm_trackcallback;
 	struct res_clm_nodegetcallback *res_clm_nodegetcallback;
 	SaClmCallbacksT callbacks;
-	unsigned int gen_first;
-	unsigned int gen_second;
 	struct message_overlay dispatch_data;
 
+	error = saHandleGet (&clmHandleDatabase, *clmHandle, (void *)&clmInstance);
+	if (error != SA_OK) {
+		return (error);
+	}
+
 	/*
 	 * Timeout instantly for SA_DISPATCH_ONE or SA_DISPATCH_ALL and
 	 * wait indefinately for SA_DISPATCH_BLOCKING
@@ -168,20 +172,9 @@
 	}
 
 	do {
-		error = saHandleConvert (&clmHandleDatabase, *clmHandle, (void *)&clmInstance, CLMINSTANCE_MUTEX_OFFSET, &gen_first);
-		if (error != SA_OK) {
-			return (handle_verified ? SA_OK : error);
-		}
-		handle_verified = 1;
-
 		poll_fd = clmInstance->fd;
 
-		/*
-		 * Unlock mutex for potentially long wait in select.  If fd
-		 * is closed by clmFinalize in select, select will return
-		 */
-
-		pthread_mutex_unlock (&clmInstance->mutex);
+		saHandlePut (&clmHandleDatabase, *clmHandle);
 
 		ufds.fd = poll_fd;
 		ufds.events = POLLIN;
@@ -192,32 +185,32 @@
 			goto error_nounlock;
 		}
 
-		dispatch_avail = ufds.revents & POLLIN;
-		if (dispatch_avail == 0 && dispatchFlags == SA_DISPATCH_ALL) {
-			break; /* exit do while cont is 1 loop */
-		}
-		if (dispatch_avail == 0) {
-			continue; /* retry select */
-		}
+		pthread_mutex_lock (&clmInstance->mutex);
+
 		/*
-		 * Re-verify amfHandle
+		 * Handle has been finalized in another thread
 		 */
-		error = saHandleConvert (&clmHandleDatabase, *clmHandle, (void *)&clmInstance, CLMINSTANCE_MUTEX_OFFSET, &gen_second);
-		if (error != SA_OK) {
-			return (handle_verified ? SA_OK : error);
+		if (clmInstance->finalize == 1) {
+			error = SA_OK;
+			pthread_mutex_unlock (&clmInstance->mutex);
+			goto error_unlock;
 		}
 
-		/*
-		 * Handle has been removed and then reallocated
-		 */
-		if (gen_first != gen_second) {
-			return SA_OK;
+		dispatch_avail = ufds.revents & POLLIN;
+		if (dispatch_avail == 0 && dispatchFlags == SA_DISPATCH_ALL) {
+			pthread_mutex_unlock (&clmInstance->mutex);
+			break; /* exit do while cont is 1 loop */
+		} else
+		if (dispatch_avail == 0) {
+			pthread_mutex_unlock (&clmInstance->mutex);
+			continue; /* next poll */
 		}
 
 		/*
 		 * Read header
 		 */
-		error = saRecvRetry (clmInstance->fd, &clmInstance->message.header, sizeof (struct message_header), MSG_WAITALL | MSG_NOSIGNAL);
+		error = saRecvRetry (clmInstance->fd, &dispatch_data.header,
+			sizeof (struct message_header), MSG_WAITALL | MSG_NOSIGNAL);
 		if (error != SA_OK) {
 			goto error_unlock;
 		}
@@ -225,9 +218,9 @@
 		/*
 		 * Read data payload
 		 */
-		if (clmInstance->message.header.size > sizeof (struct message_header)) {
-			error = saRecvRetry (clmInstance->fd, &clmInstance->message.data,
-				clmInstance->message.header.size - sizeof (struct message_header), MSG_WAITALL | MSG_NOSIGNAL);
+		if (dispatch_data.header.size > sizeof (struct message_header)) {
+			error = saRecvRetry (clmInstance->fd, &dispatch_data.data,
+				dispatch_data.header.size - sizeof (struct message_header), MSG_WAITALL | MSG_NOSIGNAL);
 			if (error != SA_OK) {
 				goto error_unlock;
 			}
@@ -238,15 +231,12 @@
 		 * operate at the same time that amfFinalize has been called.
 		*/
 		memcpy (&callbacks, &clmInstance->callbacks, sizeof (SaClmCallbacksT));
-		memcpy (&dispatch_data, &clmInstance->message, sizeof (struct message_overlay));
-
 
 		pthread_mutex_unlock (&clmInstance->mutex);
-
 		/*
 		 * Dispatch incoming message
 		 */
-		switch (clmInstance->message.header.id) {
+		switch (dispatch_data.header.id) {
 
 		case MESSAGE_RES_CLM_TRACKCALLBACK:
 			res_clm_trackcallback = (struct res_clm_trackcallback *)&dispatch_data;
@@ -277,6 +267,7 @@
 			goto error_nounlock;
 			break;
 		}
+
 		/*
 		 * Determine if more messages should be processed
 		 * */
@@ -291,10 +282,8 @@
 		}
 	} while (cont);
 
-	return (error);
-
 error_unlock:
-	pthread_mutex_unlock (&clmInstance->mutex);
+	saHandlePut (&clmHandleDatabase, *clmHandle);
 error_nounlock:
 	return (error);
 }
@@ -306,20 +295,24 @@
 	struct clmInstance *clmInstance;
 	SaErrorT error;
 
-	error = saHandleConvert (&clmHandleDatabase, *clmHandle, (void *)&clmInstance, CLMINSTANCE_MUTEX_OFFSET | HANDLECONVERT_DONTUNLOCKDB, 0);
+	error = saHandleGet (&clmHandleDatabase, *clmHandle, (void *)&clmInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
 
 	shutdown (clmInstance->fd, 0);
+
 	close (clmInstance->fd);
-	free (clmInstance);
 
-	error = saHandleRemove (&clmHandleDatabase, *clmHandle);
+	clmInstance->finalize = 1;
 
-	pthread_mutex_unlock (&clmInstance->mutex);
+	saHandlePut (&clmHandleDatabase, *clmHandle);
+
+	saHandlePut (&clmHandleDatabase, *clmHandle);
 
-	saHandleUnlockDatabase (&clmHandleDatabase);
+	saHandleWaitNoRefs (&clmHandleDatabase, *clmHandle);
+
+	error = saHandleRemove (&clmHandleDatabase, *clmHandle);
 
 	return (error);
 }
@@ -342,15 +335,19 @@
 	req_trackstart.notificationBufferAddress = notificationBuffer;
 	req_trackstart.numberOfItems = numberOfItems;
 
-	error = saHandleConvert (&clmHandleDatabase, *clmHandle, (void *)&clmInstance, CLMINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&clmHandleDatabase, *clmHandle, (void *)&clmInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
 
+	pthread_mutex_lock (&clmInstance->mutex);
+
 	error = saSendRetry (clmInstance->fd, &req_trackstart, sizeof (struct req_clm_trackstart), MSG_NOSIGNAL);
 
 	pthread_mutex_unlock (&clmInstance->mutex);
 
+	saHandlePut (&clmHandleDatabase, *clmHandle);
+
 	return (error);
 }
 
@@ -366,15 +363,19 @@
 	req_trackstop.header.size = sizeof (struct req_clm_trackstop);
 	req_trackstop.header.id = MESSAGE_REQ_CLM_TRACKSTOP;
 
-	error = saHandleConvert (&clmHandleDatabase, *clmHandle, (void *)&clmInstance, CLMINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&clmHandleDatabase, *clmHandle, (void *)&clmInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
 
+	pthread_mutex_lock (&clmInstance->mutex);
+
 	error = saSendRetry (clmInstance->fd, &req_trackstop, sizeof (struct req_clm_trackstop), MSG_NOSIGNAL);
 
 	pthread_mutex_unlock (&clmInstance->mutex);
 
+	saHandlePut (&clmHandleDatabase, *clmHandle);
+
 	return (error);
 }
 
@@ -466,14 +467,19 @@
 	memcpy (&req_clm_nodeget.nodeId, &nodeId, sizeof (SaClmNodeIdT));
 	req_clm_nodeget.clusterNodeAddress = clusterNode;
 
-	error = saHandleConvert (&clmHandleDatabase, *clmHandle, (void *)&clmInstance, CLMINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleGet (&clmHandleDatabase, *clmHandle, (void *)&clmInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
 
-	error = saSendRetry (clmInstance->fd, &req_clm_nodeget, sizeof (struct req_clm_nodeget), MSG_NOSIGNAL);
+	pthread_mutex_lock (&clmInstance->mutex);
+
+	error = saSendRetry (clmInstance->fd, &req_clm_nodeget,
+		sizeof (struct req_clm_nodeget), MSG_NOSIGNAL);
 
 	pthread_mutex_unlock (&clmInstance->mutex);
 
+	saHandlePut (&clmHandleDatabase, *clmHandle);
+
 	return (error);
 }
--- openais-cond-orig/lib/util.c	2004-06-28 18:30:33.000000000 -0700
+++ openais-cond/lib/util.c	2004-06-28 18:05:56.000000000 -0700
@@ -350,6 +350,7 @@
 	void *newHandles;
 	int found = 0;
 	void *instance;
+	struct saRefCount *refCount;
 
 	pthread_mutex_lock (&handleDatabase->mutex);
 
@@ -377,11 +378,18 @@
 
 	handleDatabase->handles[handle].valid = 1;
 	handleDatabase->handles[handle].instance = instance;
-	handleDatabase->handles[handle].generation = handleDatabase->generation++;
 
 	*handleOut = handle;
 	*instanceOut = instance;
 
+	/*
+	 * Initialize reference count information
+	 */
+	refCount = instance + handleDatabase->offsetToRefCount;
+	pthread_mutex_init (&refCount->mutex, NULL);
+	pthread_cond_init (&refCount->cond, NULL);
+	refCount->refCount = 1;
+
 	pthread_mutex_unlock (&handleDatabase->mutex);
 	return (SA_OK);
 }
@@ -398,57 +406,75 @@
 }
 
 SaErrorT
-saHandleConvert (
+saHandleGet (
 	struct saHandleDatabase *handleDatabase,
 	unsigned int handle,
-	void **instance,
-	int offsetToMutex,
-	unsigned int *generationOut)
+	void **instance)
 { 
 	SaErrorT error;
-	int unlockDatabase;
-	int locking;
-
-	unlockDatabase = (0 == (offsetToMutex & HANDLECONVERT_DONTUNLOCKDB));
-	locking = (0 == (offsetToMutex & HANDLECONVERT_NOLOCKING));
-	offsetToMutex &= 0x00ffffff; /* remove 8 bits of flags */
+	struct saRefCount *refCount;
 
-	if (locking) {
-		pthread_mutex_lock (&handleDatabase->mutex);
-	}
+	pthread_mutex_lock (&handleDatabase->mutex);
 
 	error = saHandleVerify (handleDatabase, handle);
 	if (error != SA_OK) {
-		if (locking) {
-			pthread_mutex_unlock (&handleDatabase->mutex);
-		}
-		return (error);
+		goto exit_error;	
 	}
 
 	*instance = handleDatabase->handles[handle].instance;
-	if (generationOut) {
-		*generationOut = handleDatabase->handles[handle].generation;
-	}
+
+	refCount = *instance + handleDatabase->offsetToRefCount;
 
 	/*
-	 * This function exits holding the mutex in the instance instance
-	 * pointed to by offsetToMutex (if NOLOCKING isn't set)
+	 * Replace with atomic increment
 	 */
-	if (locking) {
-		pthread_mutex_lock ((pthread_mutex_t *)(*instance + offsetToMutex));
-		if (unlockDatabase) {
-			pthread_mutex_unlock (&handleDatabase->mutex);
-		}
+	pthread_mutex_lock (&refCount->mutex);
+	if (refCount->refCount == 0) {
+		error = SA_ERR_BAD_HANDLE;
+		*instance = 0;
+	} else {
+		refCount->refCount++;
 	}
+	pthread_mutex_unlock (&refCount->mutex);
+
+exit_error:
+	pthread_mutex_unlock (&handleDatabase->mutex);
+	return (error);
+}
+
+SaErrorT
+saHandlePut (
+	struct saHandleDatabase *handleDatabase,
+	unsigned int handle) 
+{
+	struct saRefCount *refCount;
+	refCount = handleDatabase->handles[handle].instance +
+		handleDatabase->offsetToRefCount;
+
+	pthread_mutex_lock (&refCount->mutex);
+	refCount->refCount -= 1;
+	if (refCount->refCount == 0) {
+		pthread_cond_broadcast (&refCount->cond);
+	}
+	pthread_mutex_unlock (&refCount->mutex);
 
 	return (SA_OK);
 }
 
 SaErrorT
-saHandleUnlockDatabase (
-	struct saHandleDatabase *handleDatabase)
+saHandleWaitNoRefs (
+	struct saHandleDatabase *handleDatabase,
+	unsigned int handle) 
 {
-	pthread_mutex_unlock (&handleDatabase->mutex);
+	struct saRefCount *refCount;
+	refCount = handleDatabase->handles[handle].instance +
+		handleDatabase->offsetToRefCount;
+
+	pthread_mutex_lock (&refCount->mutex);
+	if (refCount->refCount) {
+		pthread_cond_wait (&refCount->cond, &refCount->mutex);
+	}
+	pthread_mutex_unlock (&refCount->mutex);
 
 	return (SA_OK);
 }


More information about the Openais mailing list