[Openais] Re: just looked at some of the openais code,
have some comments
Steven Dake
sdake at mvista.com
Tue Jun 29 12:33:03 PDT 2004
Chris,
Find attached a copy of a patch that uses reference counting for
tracking users of a handle.
The ugly locking inside the saHandleCovert (now saHandlePut) is gone and
the code looks much cleaner.
Also, it is now possible to use an api call within a callback.
I have tested that dispatch operates correctly when a Finalize call is
called and all of the APIs seem to operate correctly.
Some design notes:
1. saHandleCreate set the reference count to 1 (this is called by the
initialize functions of each API)
2. saHandleGet will not allow a handle to be taken if the reference
count is 0 even if it is valid in the database
3. The finalize does a get to get the instance data, a put to release
the reference to the instance data, a put to release the reference from
the initialize call (so if there are no users, it is now 0), and finally
a wait for refcount to go to zero before deleting.
#3 is a little mystical I think some comments can probably help, but the
code is less mystical then it was :)
If you have time and can take a look before I commit to bk, I'd
appreciate it.
Thanks!
-steve
On Tue, 2004-06-29 at 10:24, Chris Friesen wrote:
> Steven Dake wrote:
> > I am likely to make a
> > portability library of things like threads/condition variables/reference
> > counting for the library. That library could conditionally noop the
> > code for threads if necessary.
>
> Sounds good.
>
> > I feel a little uncomfortable, as a library, changing signal handlers.
> > The SA Forum AIS WG is in process of specifying what exactly happens in
> > a signal handler (should APIs be allowed or not). I'll take this input
> > to the specs group.
>
> Yeah. It's always a bit of a dilemma as to how to deal with signals. You could
> just put a stake in the ground and say that no library functions are async-safe,
> so if the user wants to use them in signal handlers they would just have to
> block signals around all library calls.
>
> > You mean add a flag to saClmDispatch? The downside of that approach is
> > that the API then becomes nonstandard for a small performance payoff.
>
> Yes, that's what I had in mind. However, I had forgotten that there were
> standards that you were coding to.
>
> > I'd like to match the AIS standard as much as possible, even when it
> > means performance won't be right. The longer-term approach to this
> > problem is to drive that kind of feedback into the specs group which
> > I'll do.
>
> Cool. That'd be great.
>
> > I see the light now... Refcounting is the way to go, especially for
> > handling dispatch and finalize calls. Then the mutex wont have to be
> > held across callbacks and a bunch of other problems go away too.
>
> I'm glad you like the refcount idea. I don't know how familiar you are with the
> kernel code, but they use refcounts all over the place.
>
> I've been looking at the code a bit more and have a few other comments:
>
> 1) In the test programs, you've got select() being called in while loops. When
> select() returns, the fd sets will be modified to reflect what fds had events
> occur. If you get a signal, you can exit select() with empty fd sets, and you
> will no longer monitor anything. You need to either copy the fd set to a
> working one each loop, or else recreate the fd set each time through the loop.
>
> 2) There's a small niggle in testamf.c, HealthcheckCallback(). The print
> statement will run every time since the loop is commented out, but it prints
> that 20 checks have occurred.
>
> 3) It might be nice to be able to set the cluster name and node name.
>
> 4) The magical version of
>
> SaVersionT version = { 'A', 1, 1 };
>
> is a bit confusing. I wonder if it could be made a constant or something rather
> than having to be explicitly typed out by the user?
>
> Chris
>
>
-------------- next part --------------
--- openais-cond-orig/lib/amf.c 2004-06-28 18:30:33.000000000 -0700
+++ openais-cond/lib/amf.c 2004-06-28 17:01:46.000000000 -0700
@@ -33,25 +33,6 @@
* THE POSSIBILITY OF SUCH DAMAGE.
*/
-/*
- * thread locking model is as follows
- *
- * APIs that use handles:
- *
- * Every handle database has a lock.
- * Each interface started with SaAmfInitialize has a lock.
- * Handle database lock is taken.
- * amfInstance lock is taken.
- * Handle database lock is released early.
- * amfInstance lock is released after amfInstance is out of use.
- *
- * Finalize API:
- * Handle database lock is taken
- * amf instance lock is taken
- * handle is removed
- * amf instance lock is released
- * handle database lock is released
- */
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
@@ -83,10 +64,11 @@
struct queue inq;
SaNameT compName;
int compRegistered;
- struct message_overlay message;
+ int finalize;
pthread_mutex_t mutex;
+ struct saRefCount refCount;
};
-#define AMFINSTANCE_MUTEX_OFFSET offset_of(struct amfInstance, mutex)
+#define AMFINSTANCE_REFCOUNT_OFFSET offset_of(struct amfInstance, refCount)
/*
* All instances in one database
@@ -94,8 +76,8 @@
static struct saHandleDatabase amfHandleDatabase = {
handleCount: 0,
handles: 0,
- generation: 0,
- mutex: PTHREAD_MUTEX_INITIALIZER
+ mutex: PTHREAD_MUTEX_INITIALIZER,
+ offsetToRefCount: AMFINSTANCE_REFCOUNT_OFFSET
};
/*
@@ -170,14 +152,14 @@
struct amfInstance *amfInstance;
SaErrorT error;
- error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
if (error != SA_OK) {
return (error);
}
*selectionObject = amfInstance->fd;
- pthread_mutex_unlock (&amfInstance->mutex);
+ saHandlePut (&amfHandleDatabase, *amfHandle);
return (SA_OK);
}
@@ -202,12 +184,15 @@
int empty;
int ignore_dispatch = 0;
int cont = 1; /* always continue do loop except when set to 0 */
- int handle_verified = 0;
int poll_fd;
- unsigned int gen_first;
- unsigned int gen_second;
struct message_overlay dispatch_data;
+ error = saHandleGet (&amfHandleDatabase, *amfHandle,
+ (void *)&amfInstance);
+ if (error != SA_OK) {
+ return (error);
+ }
+
/*
* Timeout instantly for SA_DISPATCH_ALL
*/
@@ -216,26 +201,9 @@
}
do {
- /*
- * If flags are SA_DISPATCH_BLOCKING and handle has been
- * verified, return SA_OK because a Finalize has been
- * called. Else return error from saHandleConvert
- */
- error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, &gen_first);
- if (error != SA_OK) {
- return (handle_verified ? SA_OK : error);
- }
- handle_verified = 1;
-
poll_fd = amfInstance->fd;
/*
- * Unlock mutex for potentially long wait in select. If fd
- * is closed by amfFinalize in select, select will return
- */
- pthread_mutex_unlock (&amfInstance->mutex);
-
- /*
* Read data directly from socket
*/
ufds.fd = poll_fd;
@@ -247,28 +215,27 @@
goto error_nounlock;
}
+ pthread_mutex_lock (&amfInstance->mutex);
+
+ /*
+ * Handle has been finalized in another thread
+ */
+ if (amfInstance->finalize == 1) {
+ error = SA_OK;
+ pthread_mutex_unlock (&amfInstance->mutex);
+ goto error_unlock;
+ }
+
dispatch_avail = ufds.revents & POLLIN;
if (dispatch_avail == 0 && dispatchFlags == SA_DISPATCH_ALL) {
+ pthread_mutex_unlock (&amfInstance->mutex);
break; /* exit do while cont is 1 loop */
} else
if (dispatch_avail == 0) {
- continue; /* next select */
- }
-
- /*
- * Re-verify amfHandle
- */
- error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, &gen_second);
- if (error != SA_OK) {
- return (handle_verified ? SA_OK : error);
+ pthread_mutex_unlock (&amfInstance->mutex);
+ continue; /* next poll */
}
- /*
- * Handle has been removed and then reallocated
- */
- if (gen_first != gen_second) {
- return (SA_OK);
- }
saQueueIsEmpty(&amfInstance->inq, &empty);
if (empty == 0) {
/*
@@ -276,19 +243,21 @@
*/
saQueueItemGet (&amfInstance->inq, (void *)&queue_msg);
msg = *queue_msg;
- memcpy (&amfInstance->message, msg, msg->size);
+ memcpy (&dispatch_data, msg, msg->size);
saQueueItemRemove (&amfInstance->inq);
} else {
/*
* Queue empty, read response from socket
*/
- error = saRecvRetry (amfInstance->fd, &amfInstance->message.header, sizeof (struct message_header), MSG_WAITALL | MSG_NOSIGNAL);
+ error = saRecvRetry (amfInstance->fd, &dispatch_data.header,
+ sizeof (struct message_header), MSG_WAITALL | MSG_NOSIGNAL);
if (error != SA_OK) {
goto error_unlock;
}
- if (amfInstance->message.header.size > sizeof (struct message_header)) {
- error = saRecvRetry (amfInstance->fd, &amfInstance->message.data,
- amfInstance->message.header.size - sizeof (struct message_header), MSG_WAITALL | MSG_NOSIGNAL);
+ if (dispatch_data.header.size > sizeof (struct message_header)) {
+ error = saRecvRetry (amfInstance->fd, &dispatch_data.data,
+ dispatch_data.header.size - sizeof (struct message_header),
+ MSG_WAITALL | MSG_NOSIGNAL);
if (error != SA_OK) {
goto error_unlock;
}
@@ -300,14 +269,13 @@
* operate at the same time that amfFinalize has been called in another thread.
*/
memcpy (&callbacks, &amfInstance->callbacks, sizeof (SaAmfCallbacksT));
- memcpy (&dispatch_data, &amfInstance->message, sizeof (struct message_overlay));
-
pthread_mutex_unlock (&amfInstance->mutex);
+
/*
* Dispatch incoming response
*/
- switch (amfInstance->message.header.id) {
+ switch (dispatch_data.header.id) {
case MESSAGE_RES_AMF_ACTIVATEPOLL:
/*
* This is a do nothing message which the node executive sends
@@ -398,10 +366,8 @@
}
} while (cont);
- return (error);
-
error_unlock:
- pthread_mutex_unlock (&amfInstance->mutex);
+ saHandlePut (&amfHandleDatabase, *amfHandle);
error_nounlock:
return (error);
}
@@ -413,21 +379,27 @@
struct amfInstance *amfInstance;
SaErrorT error;
- error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET | HANDLECONVERT_DONTUNLOCKDB, 0);
+ error = saHandleGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
if (error != SA_OK) {
return (error);
}
shutdown (amfInstance->fd, 0);
+
close (amfInstance->fd);
+
free (amfInstance->inq.items);
- error = saHandleRemove (&amfHandleDatabase, *amfHandle);
+ amfInstance->finalize = 1;
- pthread_mutex_unlock (&amfInstance->mutex);
+ saHandlePut (&amfHandleDatabase, *amfHandle);
+
+ saHandlePut (&amfHandleDatabase, *amfHandle);
- saHandleUnlockDatabase (&amfHandleDatabase);
+ saHandleWaitNoRefs (&amfHandleDatabase, *amfHandle);
+ error = saHandleRemove (&amfHandleDatabase, *amfHandle);
+
return (error);
}
@@ -441,6 +413,7 @@
SaErrorT error;
struct req_lib_amf_componentregister req_lib_amf_componentregister;
struct res_lib_amf_componentregister *res_lib_amf_componentregister;
+ struct message_overlay message;
req_lib_amf_componentregister.header.magic = MESSAGE_MAGIC;
req_lib_amf_componentregister.header.size = sizeof (struct req_lib_amf_componentregister);
@@ -452,11 +425,13 @@
memset (&req_lib_amf_componentregister.proxyCompName, 0, sizeof (SaNameT));
}
- error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
if (error != SA_OK) {
return (error);
}
+ pthread_mutex_lock (&amfInstance->mutex);
+
error = saSendRetry (amfInstance->fd, &req_lib_amf_componentregister, sizeof (struct req_lib_amf_componentregister), MSG_NOSIGNAL);
if (error != SA_OK) {
goto error_unlock;
@@ -468,13 +443,13 @@
* This must be done to avoid dropping async messages
* during this sync message retrieval
*/
- error = saRecvQueue (amfInstance->fd, &amfInstance->message,
+ error = saRecvQueue (amfInstance->fd, &message,
&amfInstance->inq, MESSAGE_RES_AMF_COMPONENTREGISTER);
if (error != SA_OK) {
goto error_unlock;
}
- res_lib_amf_componentregister = (struct res_lib_amf_componentregister *)&amfInstance->message;
+ res_lib_amf_componentregister = (struct res_lib_amf_componentregister *)&message;
if (res_lib_amf_componentregister->error == SA_OK) {
amfInstance->compRegistered = 1;
memcpy (&amfInstance->compName, compName, sizeof (SaNameT));
@@ -484,6 +459,7 @@
error_unlock:
pthread_mutex_unlock (&amfInstance->mutex);
+ saHandlePut (&amfHandleDatabase, *amfHandle);
return (error);
}
@@ -496,6 +472,7 @@
struct req_lib_amf_componentunregister req_lib_amf_componentunregister;
struct res_lib_amf_componentunregister *res_lib_amf_componentunregister;
struct amfInstance *amfInstance;
+ struct message_overlay message;
SaErrorT error;
req_lib_amf_componentunregister.header.magic = MESSAGE_MAGIC;
@@ -508,11 +485,13 @@
memset (&req_lib_amf_componentunregister.proxyCompName, 0, sizeof (SaNameT));
}
- error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
if (error != SA_OK) {
return (error);
}
+ pthread_mutex_lock (&amfInstance->mutex);
+
error = saSendRetry (amfInstance->fd, &req_lib_amf_componentunregister,
sizeof (struct req_lib_amf_componentunregister), MSG_NOSIGNAL);
if (error != SA_OK) {
@@ -525,12 +504,13 @@
* This must be done to avoid dropping async messages
* during this sync message retrieval
*/
- error = saRecvQueue (amfInstance->fd, &amfInstance->message,
+ error = saRecvQueue (amfInstance->fd, &message,
&amfInstance->inq, MESSAGE_RES_AMF_COMPONENTUNREGISTER);
if (error != SA_OK) {
goto error_unlock;
}
- res_lib_amf_componentunregister = (struct res_lib_amf_componentunregister *)&amfInstance->message;
+
+ res_lib_amf_componentunregister = (struct res_lib_amf_componentunregister *)&message;
if (res_lib_amf_componentunregister->error == SA_OK) {
amfInstance->compRegistered = 0;
}
@@ -538,6 +518,7 @@
error_unlock:
pthread_mutex_unlock (&amfInstance->mutex);
+ saHandlePut (&amfHandleDatabase, *amfHandle);
return (error);
}
@@ -549,7 +530,7 @@
struct amfInstance *amfInstance;
SaErrorT error;
- error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
if (error != SA_OK) {
return (error);
}
@@ -559,7 +540,7 @@
}
memcpy (compName, &amfInstance->compName, sizeof (SaNameT));
- pthread_mutex_unlock (&amfInstance->mutex);
+ saHandlePut (&amfHandleDatabase, *amfHandle);
return (SA_OK);
}
@@ -686,6 +667,7 @@
struct amfInstance *amfInstance;
struct req_amf_protectiongrouptrackstart req_amf_protectiongrouptrackstart;
struct res_amf_protectiongrouptrackstart *res_amf_protectiongrouptrackstart;
+ struct message_overlay message;
SaErrorT error;
req_amf_protectiongrouptrackstart.header.magic = MESSAGE_MAGIC;
@@ -696,33 +678,29 @@
req_amf_protectiongrouptrackstart.notificationBufferAddress = (SaAmfProtectionGroupNotificationT *)notificationBuffer;
req_amf_protectiongrouptrackstart.numberOfItems = numberOfItems;
- error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
if (error != SA_OK) {
return (error);
}
+ pthread_mutex_lock (&amfInstance->mutex);
+
error = saSendRetry (amfInstance->fd, &req_amf_protectiongrouptrackstart,
sizeof (struct req_amf_protectiongrouptrackstart), MSG_NOSIGNAL);
if (error != SA_OK) {
goto error_unlock;
}
- error = saRecvQueue (amfInstance->fd, &amfInstance->message,
+ error = saRecvQueue (amfInstance->fd, &message,
&amfInstance->inq, MESSAGE_RES_AMF_PROTECTIONGROUPTRACKSTART);
- pthread_mutex_unlock (&amfInstance->mutex);
-
- res_amf_protectiongrouptrackstart = (struct res_amf_protectiongrouptrackstart *)&amfInstance->message;
+ res_amf_protectiongrouptrackstart = (struct res_amf_protectiongrouptrackstart *)&message;
- if (error == SA_OK) {
- return (res_amf_protectiongrouptrackstart->error);
- }
-
- return (error);
+ error = res_amf_protectiongrouptrackstart->error;
error_unlock:
pthread_mutex_unlock (&amfInstance->mutex);
-
+ saHandlePut (&amfHandleDatabase, *amfHandle);
return (error);
}
@@ -734,6 +712,7 @@
struct amfInstance *amfInstance;
struct req_amf_protectiongrouptrackstop req_amf_protectiongrouptrackstop;
struct res_amf_protectiongrouptrackstop *res_amf_protectiongrouptrackstop;
+ struct message_overlay message;
SaErrorT error;
req_amf_protectiongrouptrackstop.header.magic = MESSAGE_MAGIC;
@@ -741,32 +720,31 @@
req_amf_protectiongrouptrackstop.header.id = MESSAGE_REQ_AMF_PROTECTIONGROUPTRACKSTOP;
memcpy (&req_amf_protectiongrouptrackstop.csiName, csiName, sizeof (SaNameT));
- error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
if (error != SA_OK) {
return (error);
}
+ pthread_mutex_lock (&amfInstance->mutex);
+
error = saSendRetry (amfInstance->fd, &req_amf_protectiongrouptrackstop,
sizeof (struct req_amf_protectiongrouptrackstop), MSG_NOSIGNAL);
if (error != SA_OK) {
goto error_unlock;
}
- error = saRecvQueue (amfInstance->fd, &amfInstance->message,
+ error = saRecvQueue (amfInstance->fd, &message,
&amfInstance->inq, MESSAGE_RES_AMF_PROTECTIONGROUPTRACKSTOP);
- pthread_mutex_unlock (&amfInstance->mutex);
-
- res_amf_protectiongrouptrackstop = (struct res_amf_protectiongrouptrackstop *)&amfInstance->message;
+ saHandlePut (&amfHandleDatabase, *amfHandle);
- if (error == SA_OK) {
- return (res_amf_protectiongrouptrackstop->error);
- }
+ res_amf_protectiongrouptrackstop = (struct res_amf_protectiongrouptrackstop *)&message;
- return (error);
+ error = res_amf_protectiongrouptrackstop->error;
error_unlock:
pthread_mutex_unlock (&amfInstance->mutex);
+ saHandlePut (&amfHandleDatabase, *amfHandle);
return (error);
}
--- openais-cond-orig/lib/ckpt.c 2004-06-28 18:30:33.000000000 -0700
+++ openais-cond/lib/ckpt.c 2004-06-28 18:43:54.000000000 -0700
@@ -62,58 +62,62 @@
int fd;
struct queue inq;
SaCkptCallbacksT callbacks;
+ int finalize;
pthread_mutex_t mutex;
+ struct saRefCount refCount;
};
-#define CKPTINSTANCE_MUTEX_OFFSET HANDLECONVERT_NOLOCKING
+#define CKPTINSTANCE_REFCOUNT_OFFSET offset_of(struct ckptInstance, refCount)
struct ckptCheckpointInstance {
int fd;
SaNameT checkpointName;
SaUint32T maxSectionIdSize;
+ int finalize;
pthread_mutex_t mutex;
+ struct saRefCount refCount;
};
-//#define CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET offset_of(struct ckptCheckpointInstance, mutex)
-#define CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET_DEMO offset_of(struct ckptCheckpointInstance, mutex)
-#define CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET HANDLECONVERT_NOLOCKING
+#define CKPTCHECKPOINTINSTANCE_REFCOUNT_OFFSET offset_of(struct ckptCheckpointInstance, refCount)
struct ckptSectionIteratorInstance {
int fd;
struct list_head sectionIdListHead;
SaUint32T maxSectionIdSize;
+ int finalize;
pthread_mutex_t mutex;
+ struct saRefCount refCount;
};
-//#define CKPTSECTIONITERATORINSTANCE_MUTEX_OFFSET offset_of(struct ckptSectionIteratorInstance, mutex)
-#define CKPTSECTIONITERATORINSTANCE_MUTEX_OFFSET HANDLECONVERT_NOLOCKING
+#define CKPTSECTIONITERATORINSTANCE_REFCOUNT_OFFSET offset_of(struct ckptSectionIteratorInstance, refCount)
/*
* All CKPT instances in this database
*/
static struct saHandleDatabase ckptHandleDatabase = {
- handleCount: 0,
- handles: 0,
- generation: 0,
- mutex: PTHREAD_MUTEX_INITIALIZER
+ handleCount: 0,
+ handles: 0,
+ mutex: PTHREAD_MUTEX_INITIALIZER,
+ offsetToRefCount: CKPTINSTANCE_REFCOUNT_OFFSET
+
};
/*
* All Checkpoint instances in this database
*/
static struct saHandleDatabase ckptCheckpointHandleDatabase = {
- handleCount: 0,
- handles: 0,
- generation: 0,
- mutex: PTHREAD_MUTEX_INITIALIZER
+ handleCount: 0,
+ handles: 0,
+ mutex: PTHREAD_MUTEX_INITIALIZER,
+ offsetToRefCount: CKPTCHECKPOINTINSTANCE_REFCOUNT_OFFSET
};
/*
* All section iterators in this database
*/
static struct saHandleDatabase ckptSectionIteratorHandleDatabase = {
- handleCount: 0,
- handles: 0,
- generation: 0,
- mutex: PTHREAD_MUTEX_INITIALIZER
+ handleCount: 0,
+ handles: 0,
+ mutex: PTHREAD_MUTEX_INITIALIZER,
+ offsetToRefCount: CKPTSECTIONITERATORINSTANCE_REFCOUNT_OFFSET
};
/*
@@ -192,12 +196,15 @@
struct ckptInstance *ckptInstance;
SaErrorT error;
- error = saHandleConvert (&ckptHandleDatabase, *ckptHandle, (void *)&ckptInstance, CKPTINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&ckptHandleDatabase, *ckptHandle, (void *)&ckptInstance);
if (error != SA_OK) {
return (error);
}
*selectionObject = ckptInstance->fd;
+
+ saHandlePut (&ckptHandleDatabase, *ckptHandle);
+
return (SA_OK);
}
@@ -218,7 +225,7 @@
int ignore_dispatch = 0;
int cont = 1; /* always continue do loop except when set to 0 */
- error = saHandleConvert (&ckptHandleDatabase, *ckptHandle, (void *)&ckptInstance, CKPTINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&ckptHandleDatabase, *ckptHandle, (void *)&ckptInstance);
if (error != SA_OK) {
return (error);
}
@@ -382,14 +389,27 @@
struct ckptInstance *ckptInstance;
SaErrorT error;
- error = saHandleConvert (&ckptHandleDatabase, *ckptHandle, (void *)&ckptInstance, CKPTINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&ckptHandleDatabase, *ckptHandle, (void *)&ckptInstance);
if (error != SA_OK) {
return (error);
}
+ shutdown (ckptInstance->fd, 0);
+
close (ckptInstance->fd);
+
free (ckptInstance->inq.items);
+
+ ckptInstance->finalize = 1;
+
+ saHandlePut (&ckptHandleDatabase, *ckptHandle);
+
+ saHandlePut (&ckptHandleDatabase, *ckptHandle);
+
+ saHandleWaitNoRefs (&ckptHandleDatabase, *ckptHandle);
+
saHandleRemove (&ckptHandleDatabase, *ckptHandle);
+
return (SA_OK);
}
@@ -420,8 +440,6 @@
goto error_free;
}
- pthread_mutex_init (&ckptCheckpointInstance->mutex, NULL);
-
req_lib_ckpt_checkpointopen.header.magic = MESSAGE_MAGIC;
req_lib_ckpt_checkpointopen.header.size = sizeof (struct req_lib_ckpt_checkpointopen);
req_lib_ckpt_checkpointopen.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTOPEN;
@@ -472,7 +490,7 @@
SaErrorT error;
struct req_lib_ckpt_checkpointopenasync req_lib_ckpt_checkpointopenasync;
- error = saHandleConvert (&ckptHandleDatabase, *ckptHandle, (void *)&ckptInstance, CKPTINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&ckptHandleDatabase, *ckptHandle, (void *)&ckptInstance);
if (error != SA_OK) {
return (error);
}
@@ -488,9 +506,15 @@
req_lib_ckpt_checkpointopenasync.checkpointOpenFlags = checkpointOpenFlags;
+ pthread_mutex_lock (&ckptInstance->mutex);
+
error = saSendRetry (ckptInstance->fd, &req_lib_ckpt_checkpointopenasync,
sizeof (struct req_lib_ckpt_checkpointopenasync), MSG_NOSIGNAL);
+ pthread_mutex_unlock (&ckptInstance->mutex);
+
+ saHandlePut (&ckptHandleDatabase, *ckptHandle);
+
return (error);
}
@@ -501,13 +525,24 @@
SaErrorT error;
struct ckptCheckpointInstance *ckptCheckpointInstance;
- error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
- (void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+ (void *)&ckptCheckpointInstance);
if (error != SA_OK) {
goto error_exit;
}
+ shutdown (ckptCheckpointInstance->fd, 0);
+
close (ckptCheckpointInstance->fd);
+
+ ckptCheckpointInstance->finalize = 1;
+
+ saHandlePut (&ckptCheckpointHandleDatabase, *checkpointHandle);
+
+ saHandlePut (&ckptCheckpointHandleDatabase, *checkpointHandle);
+
+ saHandleWaitNoRefs (&ckptCheckpointHandleDatabase, *checkpointHandle);
+
error = saHandleRemove (&ckptCheckpointHandleDatabase, *checkpointHandle);
error_exit:
@@ -558,7 +593,8 @@
struct ckptCheckpointInstance *ckptCheckpointInstance;
struct req_lib_ckpt_checkpointretentiondurationset req_lib_ckpt_checkpointretentiondurationset;
- error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle, (void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+ (void *)&ckptCheckpointInstance);
if (error != SA_OK) {
goto error_exit;
}
@@ -567,10 +603,13 @@
req_lib_ckpt_checkpointretentiondurationset.header.size = sizeof (struct req_lib_ckpt_checkpointretentiondurationset);
req_lib_ckpt_checkpointretentiondurationset.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTRETENTIONDURATIONSET;
+ pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_checkpointretentiondurationset, sizeof (struct req_lib_ckpt_checkpointretentiondurationset), MSG_NOSIGNAL);
- if (error != SA_OK) {
- goto error_exit;
- }
+
+ pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+
+ saHandlePut (&ckptCheckpointHandleDatabase, *checkpointHandle);
error_exit:
return (error);
@@ -585,7 +624,8 @@
struct req_lib_ckpt_activecheckpointset req_lib_ckpt_activecheckpointset;
struct res_lib_ckpt_activecheckpointset res_lib_ckpt_activecheckpointset;
- error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle, (void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+ (void *)&ckptCheckpointInstance);
if (error != SA_OK) {
goto error_exit;
}
@@ -594,13 +634,21 @@
req_lib_ckpt_activecheckpointset.header.size = sizeof (struct req_lib_ckpt_activecheckpointset);
req_lib_ckpt_activecheckpointset.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_ACTIVECHECKPOINTSET;
+ pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_activecheckpointset,
sizeof (struct req_lib_ckpt_activecheckpointset), MSG_NOSIGNAL);
if (error != SA_OK) {
goto error_exit;
}
- error = saRecvQueue (ckptCheckpointInstance->fd, &res_lib_ckpt_activecheckpointset, 0, MESSAGE_RES_CKPT_CHECKPOINT_ACTIVECHECKPOINTSET);
+ error = saRecvQueue (ckptCheckpointInstance->fd,
+ &res_lib_ckpt_activecheckpointset, 0,
+ MESSAGE_RES_CKPT_CHECKPOINT_ACTIVECHECKPOINTSET);
+
+ pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+
+ saHandlePut (&ckptCheckpointHandleDatabase, *checkpointHandle);
error_exit:
return (error == SA_OK ? res_lib_ckpt_activecheckpointset.error : error);
@@ -616,16 +664,18 @@
struct req_lib_ckpt_checkpointstatusget req_lib_ckpt_checkpointstatusget;
struct res_lib_ckpt_checkpointstatusget res_lib_ckpt_checkpointstatusget;
- error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
- (void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+ (void *)&ckptCheckpointInstance);
if (error != SA_OK) {
- goto error_exit;
+ return (error);
}
req_lib_ckpt_checkpointstatusget.header.magic = MESSAGE_MAGIC;
req_lib_ckpt_checkpointstatusget.header.size = sizeof (struct req_lib_ckpt_checkpointstatusget);
req_lib_ckpt_checkpointstatusget.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTSTATUSGET;
+ pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_checkpointstatusget,
sizeof (struct req_lib_ckpt_checkpointstatusget), MSG_NOSIGNAL);
if (error != SA_OK) {
@@ -638,11 +688,14 @@
goto error_exit;
}
+ pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+
memcpy (checkpointStatus,
&res_lib_ckpt_checkpointstatusget.checkpointStatus,
sizeof (SaCkptCheckpointStatusT));
error_exit:
+ saHandlePut (&ckptCheckpointHandleDatabase, *checkpointHandle);
return (error);
}
@@ -658,10 +711,10 @@
struct req_lib_ckpt_sectioncreate req_lib_ckpt_sectioncreate;
struct res_lib_ckpt_sectioncreate res_lib_ckpt_sectioncreate;
- error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
- (void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+ (void *)&ckptCheckpointInstance);
if (error != SA_OK) {
- goto error_exit;
+ return (error);
}
req_lib_ckpt_sectioncreate.header.magic = MESSAGE_MAGIC;
@@ -677,6 +730,8 @@
req_lib_ckpt_sectioncreate.initialDataSize = initialDataSize;
+ pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_sectioncreate,
sizeof (struct req_lib_ckpt_sectioncreate), MSG_NOSIGNAL);
if (error != SA_OK) {
@@ -698,9 +753,14 @@
goto error_exit;
}
- error = saRecvQueue (ckptCheckpointInstance->fd, &res_lib_ckpt_sectioncreate, 0, MESSAGE_RES_CKPT_CHECKPOINT_SECTIONCREATE);
+ error = saRecvQueue (ckptCheckpointInstance->fd,
+ &res_lib_ckpt_sectioncreate, 0,
+ MESSAGE_RES_CKPT_CHECKPOINT_SECTIONCREATE);
+
+ pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
error_exit:
+ saHandlePut (&ckptCheckpointHandleDatabase, *checkpointHandle);
return (error == SA_OK ? res_lib_ckpt_sectioncreate.error : error);
}
@@ -715,12 +775,14 @@
struct req_lib_ckpt_sectiondelete req_lib_ckpt_sectiondelete;
struct res_lib_ckpt_sectiondelete res_lib_ckpt_sectiondelete;
- error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
- (void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+ (void *)&ckptCheckpointInstance);
if (error != SA_OK) {
- goto error_exit;
+ return (error);
}
+ pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
req_lib_ckpt_sectiondelete.header.magic = MESSAGE_MAGIC;
req_lib_ckpt_sectiondelete.header.size = sizeof (struct req_lib_ckpt_sectiondelete) + sectionId->idLen;
req_lib_ckpt_sectiondelete.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONDELETE;
@@ -740,9 +802,14 @@
if (error != SA_OK) {
goto error_exit;
}
- error = saRecvQueue (ckptCheckpointInstance->fd, &res_lib_ckpt_sectiondelete, 0, MESSAGE_RES_CKPT_CHECKPOINT_SECTIONDELETE);
+ error = saRecvQueue (ckptCheckpointInstance->fd,
+ &res_lib_ckpt_sectiondelete, 0,
+ MESSAGE_RES_CKPT_CHECKPOINT_SECTIONDELETE);
+
+ pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
error_exit:
+ saHandlePut (&ckptCheckpointHandleDatabase, *checkpointHandle);
return (error == SA_OK ? res_lib_ckpt_sectiondelete.error : error);
}
@@ -757,8 +824,8 @@
struct req_lib_ckpt_sectionexpirationtimeset req_lib_ckpt_sectionexpirationtimeset;
struct res_lib_ckpt_sectionexpirationtimeset res_lib_ckpt_sectionexpirationtimeset;
- error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
- (void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+ (void *)&ckptCheckpointInstance);
if (error != SA_OK) {
goto error_exit;
}
@@ -769,6 +836,8 @@
req_lib_ckpt_sectionexpirationtimeset.idLen = sectionId->idLen;
req_lib_ckpt_sectionexpirationtimeset.expirationTime = expirationTime;
+ pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_sectionexpirationtimeset,
sizeof (struct req_lib_ckpt_sectionexpirationtimeset), MSG_NOSIGNAL);
if (error != SA_OK) {
@@ -786,10 +855,14 @@
}
}
- error = saRecvQueue (ckptCheckpointInstance->fd, &res_lib_ckpt_sectionexpirationtimeset,
+ error = saRecvQueue (ckptCheckpointInstance->fd,
+ &res_lib_ckpt_sectionexpirationtimeset,
0, MESSAGE_RES_CKPT_CHECKPOINT_SECTIONEXPIRATIONTIMESET);
+ pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+
error_exit:
+ saHandlePut (&ckptCheckpointHandleDatabase, *checkpointHandle);
return (error == SA_OK ? res_lib_ckpt_sectionexpirationtimeset.error : error);
}
@@ -806,8 +879,8 @@
struct req_lib_ckpt_sectioniteratorinitialize req_lib_ckpt_sectioniteratorinitialize;
struct res_lib_ckpt_sectioniteratorinitialize res_lib_ckpt_sectioniteratorinitialize;
- error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
- (void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+ (void *)&ckptCheckpointInstance);
if (error != SA_OK) {
goto error_exit;
}
@@ -818,6 +891,9 @@
if (error != SA_OK) {
goto error_exit;
}
+
+ pthread_mutex_init (&ckptSectionIteratorInstance->mutex, NULL);
+
/*
* Setup section id list for iterator next
*/
@@ -832,8 +908,6 @@
goto error_remove;
}
- pthread_mutex_init (&ckptSectionIteratorInstance->mutex, NULL);
-
req_lib_ckpt_sectioniteratorinitialize.header.magic = MESSAGE_MAGIC;
req_lib_ckpt_sectioniteratorinitialize.header.size = sizeof (struct req_lib_ckpt_sectioniteratorinitialize);
req_lib_ckpt_sectioniteratorinitialize.header.id = MESSAGE_REQ_CKPT_SECTIONITERATOR_SECTIONITERATORINITIALIZE;
@@ -842,8 +916,12 @@
memcpy (&req_lib_ckpt_sectioniteratorinitialize.checkpointName,
&ckptCheckpointInstance->checkpointName, sizeof (SaNameT));
+ pthread_mutex_lock (&ckptSectionIteratorInstance->mutex);
+
error = saSendRetry (ckptSectionIteratorInstance->fd,
- &req_lib_ckpt_sectioniteratorinitialize, sizeof (struct req_lib_ckpt_sectioniteratorinitialize), MSG_NOSIGNAL);
+ &req_lib_ckpt_sectioniteratorinitialize,
+ sizeof (struct req_lib_ckpt_sectioniteratorinitialize),
+ MSG_NOSIGNAL);
if (error != SA_OK) {
goto error_close;
@@ -853,10 +931,17 @@
&res_lib_ckpt_sectioniteratorinitialize, 0,
MESSAGE_RES_CKPT_SECTIONITERATOR_SECTIONITERATORINITIALIZE);
+ pthread_mutex_unlock (&ckptSectionIteratorInstance->mutex);
+
error_exit:
+ saHandlePut (&ckptCheckpointHandleDatabase, *checkpointHandle);
return (error == SA_OK ? res_lib_ckpt_sectioniteratorinitialize.error : error);
+
error_close:
+ saHandlePut (&ckptCheckpointHandleDatabase, *checkpointHandle);
+
close (ckptSectionIteratorInstance->fd);
+
error_remove:
saHandleRemove (&ckptSectionIteratorHandleDatabase, *sectionIterator);
return (error);
@@ -878,9 +963,8 @@
struct res_lib_ckpt_sectioniteratornext res_lib_ckpt_sectioniteratornext;
struct iteratorSectionIdListEntry *iteratorSectionIdListEntry;
- error = saHandleConvert (&ckptSectionIteratorHandleDatabase, *sectionIterator,
- (void *)&ckptSectionIteratorInstance,
- CKPTSECTIONITERATORINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&ckptSectionIteratorHandleDatabase, *sectionIterator,
+ (void *)&ckptSectionIteratorInstance);
if (error != SA_OK) {
goto error_exit;
}
@@ -898,6 +982,8 @@
req_lib_ckpt_sectioniteratornext.header.size = sizeof (struct req_lib_ckpt_sectioniteratornext);
req_lib_ckpt_sectioniteratornext.header.id = MESSAGE_REQ_CKPT_SECTIONITERATOR_SECTIONITERATORNEXT;
+ pthread_mutex_lock (&ckptSectionIteratorInstance->mutex);
+
error = saSendRetry (ckptSectionIteratorInstance->fd,
&req_lib_ckpt_sectioniteratornext,
sizeof (struct req_lib_ckpt_sectioniteratornext), MSG_NOSIGNAL);
@@ -934,7 +1020,9 @@
list_add (&iteratorSectionIdListEntry->list, &ckptSectionIteratorInstance->sectionIdListHead);
}
+ pthread_mutex_unlock (&ckptSectionIteratorInstance->mutex);
error_exit:
+ saHandlePut (&ckptSectionIteratorHandleDatabase, *sectionIterator);
return (error == SA_OK ? res_lib_ckpt_sectioniteratornext.error : error);
}
@@ -948,9 +1036,8 @@
struct list_head *sectionIdIteratorList;
struct list_head *sectionIdIteratorListNext;
- error = saHandleConvert (&ckptSectionIteratorHandleDatabase, *sectionIterator,
- (void *)&ckptSectionIteratorInstance,
- CKPTSECTIONITERATORINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&ckptSectionIteratorHandleDatabase, *sectionIterator,
+ (void *)&ckptSectionIteratorInstance);
if (error != SA_OK) {
goto error_exit;
}
@@ -973,6 +1060,15 @@
free (iteratorSectionIdListEntry);
}
+ saHandlePut (&ckptSectionIteratorHandleDatabase,
+ *sectionIterator);
+
+ saHandlePut (&ckptSectionIteratorHandleDatabase,
+ *sectionIterator);
+
+ saHandleWaitNoRefs (&ckptSectionIteratorHandleDatabase,
+ *sectionIterator);
+
saHandleRemove (&ckptSectionIteratorHandleDatabase, *sectionIterator);
error_exit:
@@ -994,16 +1090,18 @@
struct iovec iov[3];
int iov_len = 0;
- error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
- (void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET_DEMO, 0);
+ error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+ (void *)&ckptCheckpointInstance);
if (error != SA_OK) {
- goto error_exit;
+ return (error);
}
req_lib_ckpt_sectionwrite.header.magic = MESSAGE_MAGIC;
req_lib_ckpt_sectionwrite.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONWRITE;
+ pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
for (i = 0; i < numberOfElements; i++) {
req_lib_ckpt_sectionwrite.header.size = sizeof (struct req_lib_ckpt_sectionwrite) + ioVector[i].sectionId.idLen + ioVector[i].dataSize;
@@ -1050,6 +1148,10 @@
error_exit:
pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+
+ saHandlePut (&ckptCheckpointHandleDatabase,
+ *checkpointHandle);
+
return (error == SA_OK ? res_lib_ckpt_sectionwrite.error : error);
}
@@ -1065,10 +1167,10 @@
struct req_lib_ckpt_sectionoverwrite req_lib_ckpt_sectionoverwrite;
struct res_lib_ckpt_sectionoverwrite res_lib_ckpt_sectionoverwrite;
- error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
- (void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+ (void *)&ckptCheckpointInstance);
if (error != SA_OK) {
- goto error_exit;
+ return (error);
}
req_lib_ckpt_sectionoverwrite.header.magic = MESSAGE_MAGIC;
@@ -1077,6 +1179,8 @@
req_lib_ckpt_sectionoverwrite.idLen = sectionId->idLen;
req_lib_ckpt_sectionoverwrite.dataSize = dataSize;
+ pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_sectionoverwrite,
sizeof (struct req_lib_ckpt_sectionoverwrite), MSG_NOSIGNAL);
if (error != SA_OK) {
@@ -1099,6 +1203,11 @@
&res_lib_ckpt_sectionoverwrite, 0, MESSAGE_RES_CKPT_CHECKPOINT_SECTIONOVERWRITE);
error_exit:
+ pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+
+ saHandlePut (&ckptCheckpointHandleDatabase,
+ *checkpointHandle);
+
return (error == SA_OK ? res_lib_ckpt_sectionoverwrite.error : error);
}
@@ -1117,15 +1226,17 @@
int i;
struct iovec iov[3];
- error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
- (void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET_DEMO, 0);
+ error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+ (void *)&ckptCheckpointInstance);
if (error != SA_OK) {
- goto error_exit;
+ return (error);
}
req_lib_ckpt_sectionread.header.magic = MESSAGE_MAGIC;
req_lib_ckpt_sectionread.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONREAD;
+ pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
for (i = 0; i < numberOfElements; i++) {
req_lib_ckpt_sectionread.header.size = sizeof (struct req_lib_ckpt_sectionread) +
ioVector[i].sectionId.idLen;
@@ -1179,6 +1290,10 @@
error_exit:
pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+
+ saHandlePut (&ckptCheckpointHandleDatabase,
+ *checkpointHandle);
+
return (error == SA_OK ? res_lib_ckpt_sectionread.error : error);
}
@@ -1192,16 +1307,18 @@
struct req_lib_ckpt_checkpointsynchronize req_lib_ckpt_checkpointsynchronize;
struct res_lib_ckpt_checkpointsynchronize res_lib_ckpt_checkpointsynchronize;
- error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
- (void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+ (void *)&ckptCheckpointInstance);
if (error != SA_OK) {
- goto error_exit;
+ return (error);
}
req_lib_ckpt_checkpointsynchronize.header.magic = MESSAGE_MAGIC;
req_lib_ckpt_checkpointsynchronize.header.size = sizeof (struct req_lib_ckpt_checkpointsynchronize);
req_lib_ckpt_checkpointsynchronize.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZE;
+ pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_checkpointsynchronize,
sizeof (struct req_lib_ckpt_checkpointsynchronize), MSG_NOSIGNAL);
@@ -1213,6 +1330,11 @@
0, MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZE);
error_exit:
+ pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+
+ saHandlePut (&ckptCheckpointHandleDatabase,
+ *checkpointHandle);
+
return (error == SA_OK ? res_lib_ckpt_checkpointsynchronize.error : error);
}
@@ -1227,15 +1349,18 @@
SaErrorT error;
struct req_lib_ckpt_checkpointsynchronizeasync req_lib_ckpt_checkpointsynchronizeasync;
- error = saHandleConvert (&ckptCheckpointHandleDatabase, *checkpointHandle,
- (void *)&ckptCheckpointInstance, CKPTCHECKPOINTINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&ckptCheckpointHandleDatabase, *checkpointHandle,
+ (void *)&ckptCheckpointInstance);
if (error != SA_OK) {
- goto error_exit;
+
+ return (error);
}
- error = saHandleConvert (&ckptHandleDatabase, *ckptHandle,
- (void *)&ckptInstance, CKPTINSTANCE_MUTEX_OFFSET, 0);
+
+ error = saHandleGet (&ckptHandleDatabase, *ckptHandle, (void *)&ckptInstance);
if (error != SA_OK) {
- goto error_exit;
+ saHandlePut (&ckptCheckpointHandleDatabase,
+ *checkpointHandle);
+ return (error);
}
req_lib_ckpt_checkpointsynchronizeasync.header.magic = MESSAGE_MAGIC;
@@ -1243,9 +1368,19 @@
req_lib_ckpt_checkpointsynchronizeasync.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTOPENASYNC;
req_lib_ckpt_checkpointsynchronizeasync.invocation = invocation;
+ pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+
+ pthread_mutex_lock (&ckptInstance->mutex);
+
error = saSendRetry (ckptInstance->fd, &req_lib_ckpt_checkpointsynchronizeasync,
sizeof (struct req_lib_ckpt_checkpointsynchronizeasync), MSG_NOSIGNAL);
-error_exit:
+ pthread_mutex_unlock (&ckptInstance->mutex);
+
+ pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+
+ saHandlePut (&ckptCheckpointHandleDatabase,
+ *checkpointHandle);
+
return (error);
}
--- openais-cond-orig/lib/clm.c 2004-06-28 18:30:33.000000000 -0700
+++ openais-cond/lib/clm.c 2004-06-28 18:39:04.000000000 -0700
@@ -57,16 +57,18 @@
struct clmInstance {
int fd;
SaClmCallbacksT callbacks;
- struct message_overlay message;
+ int finalize;
pthread_mutex_t mutex;
+ struct saRefCount refCount;
};
-#define CLMINSTANCE_MUTEX_OFFSET offset_of(struct clmInstance, mutex)
+
+#define CLMINSTANCE_REFCOUNT_OFFSET offset_of(struct clmInstance, refCount)
static struct saHandleDatabase clmHandleDatabase = {
handleCount: 0,
handles: 0,
- generation: 0,
- mutex: PTHREAD_MUTEX_INITIALIZER
+ mutex: PTHREAD_MUTEX_INITIALIZER,
+ offsetToRefCount: CLMINSTANCE_REFCOUNT_OFFSET
};
/*
@@ -128,14 +130,14 @@
struct clmInstance *clmInstance;
SaErrorT error;
- error = saHandleConvert (&clmHandleDatabase, *clmHandle, (void *)&clmInstance, CLMINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&clmHandleDatabase, *clmHandle, (void *)&clmInstance);
if (error != SA_OK) {
return (error);
}
*selectionObject = clmInstance->fd;
- pthread_mutex_unlock (&clmInstance->mutex);
+ saHandlePut (&clmHandleDatabase, *clmHandle);
return (SA_OK);
}
@@ -150,15 +152,17 @@
int cont = 1; /* always continue do loop except when set to 0 */
int dispatch_avail;
int poll_fd;
- int handle_verified = 0;
struct clmInstance *clmInstance;
struct res_clm_trackcallback *res_clm_trackcallback;
struct res_clm_nodegetcallback *res_clm_nodegetcallback;
SaClmCallbacksT callbacks;
- unsigned int gen_first;
- unsigned int gen_second;
struct message_overlay dispatch_data;
+ error = saHandleGet (&clmHandleDatabase, *clmHandle, (void *)&clmInstance);
+ if (error != SA_OK) {
+ return (error);
+ }
+
/*
* Timeout instantly for SA_DISPATCH_ONE or SA_DISPATCH_ALL and
* wait indefinately for SA_DISPATCH_BLOCKING
@@ -168,20 +172,9 @@
}
do {
- error = saHandleConvert (&clmHandleDatabase, *clmHandle, (void *)&clmInstance, CLMINSTANCE_MUTEX_OFFSET, &gen_first);
- if (error != SA_OK) {
- return (handle_verified ? SA_OK : error);
- }
- handle_verified = 1;
-
poll_fd = clmInstance->fd;
- /*
- * Unlock mutex for potentially long wait in select. If fd
- * is closed by clmFinalize in select, select will return
- */
-
- pthread_mutex_unlock (&clmInstance->mutex);
+ saHandlePut (&clmHandleDatabase, *clmHandle);
ufds.fd = poll_fd;
ufds.events = POLLIN;
@@ -192,32 +185,32 @@
goto error_nounlock;
}
- dispatch_avail = ufds.revents & POLLIN;
- if (dispatch_avail == 0 && dispatchFlags == SA_DISPATCH_ALL) {
- break; /* exit do while cont is 1 loop */
- }
- if (dispatch_avail == 0) {
- continue; /* retry select */
- }
+ pthread_mutex_lock (&clmInstance->mutex);
+
/*
- * Re-verify amfHandle
+ * Handle has been finalized in another thread
*/
- error = saHandleConvert (&clmHandleDatabase, *clmHandle, (void *)&clmInstance, CLMINSTANCE_MUTEX_OFFSET, &gen_second);
- if (error != SA_OK) {
- return (handle_verified ? SA_OK : error);
+ if (clmInstance->finalize == 1) {
+ error = SA_OK;
+ pthread_mutex_unlock (&clmInstance->mutex);
+ goto error_unlock;
}
- /*
- * Handle has been removed and then reallocated
- */
- if (gen_first != gen_second) {
- return SA_OK;
+ dispatch_avail = ufds.revents & POLLIN;
+ if (dispatch_avail == 0 && dispatchFlags == SA_DISPATCH_ALL) {
+ pthread_mutex_unlock (&clmInstance->mutex);
+ break; /* exit do while cont is 1 loop */
+ } else
+ if (dispatch_avail == 0) {
+ pthread_mutex_unlock (&clmInstance->mutex);
+ continue; /* next poll */
}
/*
* Read header
*/
- error = saRecvRetry (clmInstance->fd, &clmInstance->message.header, sizeof (struct message_header), MSG_WAITALL | MSG_NOSIGNAL);
+ error = saRecvRetry (clmInstance->fd, &dispatch_data.header,
+ sizeof (struct message_header), MSG_WAITALL | MSG_NOSIGNAL);
if (error != SA_OK) {
goto error_unlock;
}
@@ -225,9 +218,9 @@
/*
* Read data payload
*/
- if (clmInstance->message.header.size > sizeof (struct message_header)) {
- error = saRecvRetry (clmInstance->fd, &clmInstance->message.data,
- clmInstance->message.header.size - sizeof (struct message_header), MSG_WAITALL | MSG_NOSIGNAL);
+ if (dispatch_data.header.size > sizeof (struct message_header)) {
+ error = saRecvRetry (clmInstance->fd, &dispatch_data.data,
+ dispatch_data.header.size - sizeof (struct message_header), MSG_WAITALL | MSG_NOSIGNAL);
if (error != SA_OK) {
goto error_unlock;
}
@@ -238,15 +231,12 @@
* operate at the same time that amfFinalize has been called.
*/
memcpy (&callbacks, &clmInstance->callbacks, sizeof (SaClmCallbacksT));
- memcpy (&dispatch_data, &clmInstance->message, sizeof (struct message_overlay));
-
pthread_mutex_unlock (&clmInstance->mutex);
-
/*
* Dispatch incoming message
*/
- switch (clmInstance->message.header.id) {
+ switch (dispatch_data.header.id) {
case MESSAGE_RES_CLM_TRACKCALLBACK:
res_clm_trackcallback = (struct res_clm_trackcallback *)&dispatch_data;
@@ -277,6 +267,7 @@
goto error_nounlock;
break;
}
+
/*
* Determine if more messages should be processed
* */
@@ -291,10 +282,8 @@
}
} while (cont);
- return (error);
-
error_unlock:
- pthread_mutex_unlock (&clmInstance->mutex);
+ saHandlePut (&clmHandleDatabase, *clmHandle);
error_nounlock:
return (error);
}
@@ -306,20 +295,24 @@
struct clmInstance *clmInstance;
SaErrorT error;
- error = saHandleConvert (&clmHandleDatabase, *clmHandle, (void *)&clmInstance, CLMINSTANCE_MUTEX_OFFSET | HANDLECONVERT_DONTUNLOCKDB, 0);
+ error = saHandleGet (&clmHandleDatabase, *clmHandle, (void *)&clmInstance);
if (error != SA_OK) {
return (error);
}
shutdown (clmInstance->fd, 0);
+
close (clmInstance->fd);
- free (clmInstance);
- error = saHandleRemove (&clmHandleDatabase, *clmHandle);
+ clmInstance->finalize = 1;
- pthread_mutex_unlock (&clmInstance->mutex);
+ saHandlePut (&clmHandleDatabase, *clmHandle);
+
+ saHandlePut (&clmHandleDatabase, *clmHandle);
- saHandleUnlockDatabase (&clmHandleDatabase);
+ saHandleWaitNoRefs (&clmHandleDatabase, *clmHandle);
+
+ error = saHandleRemove (&clmHandleDatabase, *clmHandle);
return (error);
}
@@ -342,15 +335,19 @@
req_trackstart.notificationBufferAddress = notificationBuffer;
req_trackstart.numberOfItems = numberOfItems;
- error = saHandleConvert (&clmHandleDatabase, *clmHandle, (void *)&clmInstance, CLMINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&clmHandleDatabase, *clmHandle, (void *)&clmInstance);
if (error != SA_OK) {
return (error);
}
+ pthread_mutex_lock (&clmInstance->mutex);
+
error = saSendRetry (clmInstance->fd, &req_trackstart, sizeof (struct req_clm_trackstart), MSG_NOSIGNAL);
pthread_mutex_unlock (&clmInstance->mutex);
+ saHandlePut (&clmHandleDatabase, *clmHandle);
+
return (error);
}
@@ -366,15 +363,19 @@
req_trackstop.header.size = sizeof (struct req_clm_trackstop);
req_trackstop.header.id = MESSAGE_REQ_CLM_TRACKSTOP;
- error = saHandleConvert (&clmHandleDatabase, *clmHandle, (void *)&clmInstance, CLMINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&clmHandleDatabase, *clmHandle, (void *)&clmInstance);
if (error != SA_OK) {
return (error);
}
+ pthread_mutex_lock (&clmInstance->mutex);
+
error = saSendRetry (clmInstance->fd, &req_trackstop, sizeof (struct req_clm_trackstop), MSG_NOSIGNAL);
pthread_mutex_unlock (&clmInstance->mutex);
+ saHandlePut (&clmHandleDatabase, *clmHandle);
+
return (error);
}
@@ -466,14 +467,19 @@
memcpy (&req_clm_nodeget.nodeId, &nodeId, sizeof (SaClmNodeIdT));
req_clm_nodeget.clusterNodeAddress = clusterNode;
- error = saHandleConvert (&clmHandleDatabase, *clmHandle, (void *)&clmInstance, CLMINSTANCE_MUTEX_OFFSET, 0);
+ error = saHandleGet (&clmHandleDatabase, *clmHandle, (void *)&clmInstance);
if (error != SA_OK) {
return (error);
}
- error = saSendRetry (clmInstance->fd, &req_clm_nodeget, sizeof (struct req_clm_nodeget), MSG_NOSIGNAL);
+ pthread_mutex_lock (&clmInstance->mutex);
+
+ error = saSendRetry (clmInstance->fd, &req_clm_nodeget,
+ sizeof (struct req_clm_nodeget), MSG_NOSIGNAL);
pthread_mutex_unlock (&clmInstance->mutex);
+ saHandlePut (&clmHandleDatabase, *clmHandle);
+
return (error);
}
--- openais-cond-orig/lib/util.c 2004-06-28 18:30:33.000000000 -0700
+++ openais-cond/lib/util.c 2004-06-28 18:05:56.000000000 -0700
@@ -350,6 +350,7 @@
void *newHandles;
int found = 0;
void *instance;
+ struct saRefCount *refCount;
pthread_mutex_lock (&handleDatabase->mutex);
@@ -377,11 +378,18 @@
handleDatabase->handles[handle].valid = 1;
handleDatabase->handles[handle].instance = instance;
- handleDatabase->handles[handle].generation = handleDatabase->generation++;
*handleOut = handle;
*instanceOut = instance;
+ /*
+ * Initialize reference count information
+ */
+ refCount = instance + handleDatabase->offsetToRefCount;
+ pthread_mutex_init (&refCount->mutex, NULL);
+ pthread_cond_init (&refCount->cond, NULL);
+ refCount->refCount = 1;
+
pthread_mutex_unlock (&handleDatabase->mutex);
return (SA_OK);
}
@@ -398,57 +406,75 @@
}
SaErrorT
-saHandleConvert (
+saHandleGet (
struct saHandleDatabase *handleDatabase,
unsigned int handle,
- void **instance,
- int offsetToMutex,
- unsigned int *generationOut)
+ void **instance)
{
SaErrorT error;
- int unlockDatabase;
- int locking;
-
- unlockDatabase = (0 == (offsetToMutex & HANDLECONVERT_DONTUNLOCKDB));
- locking = (0 == (offsetToMutex & HANDLECONVERT_NOLOCKING));
- offsetToMutex &= 0x00ffffff; /* remove 8 bits of flags */
+ struct saRefCount *refCount;
- if (locking) {
- pthread_mutex_lock (&handleDatabase->mutex);
- }
+ pthread_mutex_lock (&handleDatabase->mutex);
error = saHandleVerify (handleDatabase, handle);
if (error != SA_OK) {
- if (locking) {
- pthread_mutex_unlock (&handleDatabase->mutex);
- }
- return (error);
+ goto exit_error;
}
*instance = handleDatabase->handles[handle].instance;
- if (generationOut) {
- *generationOut = handleDatabase->handles[handle].generation;
- }
+
+ refCount = *instance + handleDatabase->offsetToRefCount;
/*
- * This function exits holding the mutex in the instance instance
- * pointed to by offsetToMutex (if NOLOCKING isn't set)
+ * Replace with atomic increment
*/
- if (locking) {
- pthread_mutex_lock ((pthread_mutex_t *)(*instance + offsetToMutex));
- if (unlockDatabase) {
- pthread_mutex_unlock (&handleDatabase->mutex);
- }
+ pthread_mutex_lock (&refCount->mutex);
+ if (refCount->refCount == 0) {
+ error = SA_ERR_BAD_HANDLE;
+ *instance = 0;
+ } else {
+ refCount->refCount++;
}
+ pthread_mutex_unlock (&refCount->mutex);
+
+exit_error:
+ pthread_mutex_unlock (&handleDatabase->mutex);
+ return (error);
+}
+
+SaErrorT
+saHandlePut (
+ struct saHandleDatabase *handleDatabase,
+ unsigned int handle)
+{
+ struct saRefCount *refCount;
+ refCount = handleDatabase->handles[handle].instance +
+ handleDatabase->offsetToRefCount;
+
+ pthread_mutex_lock (&refCount->mutex);
+ refCount->refCount -= 1;
+ if (refCount->refCount == 0) {
+ pthread_cond_broadcast (&refCount->cond);
+ }
+ pthread_mutex_unlock (&refCount->mutex);
return (SA_OK);
}
SaErrorT
-saHandleUnlockDatabase (
- struct saHandleDatabase *handleDatabase)
+saHandleWaitNoRefs (
+ struct saHandleDatabase *handleDatabase,
+ unsigned int handle)
{
- pthread_mutex_unlock (&handleDatabase->mutex);
+ struct saRefCount *refCount;
+ refCount = handleDatabase->handles[handle].instance +
+ handleDatabase->offsetToRefCount;
+
+ pthread_mutex_lock (&refCount->mutex);
+ if (refCount->refCount) {
+ pthread_cond_wait (&refCount->cond, &refCount->mutex);
+ }
+ pthread_mutex_unlock (&refCount->mutex);
return (SA_OK);
}
More information about the Openais
mailing list