[Openais] [PATCH 1/2] CTS: add service load/unload test
Angus Salkeld
asalkeld at redhat.com
Wed Mar 17 19:43:30 PDT 2010
This just tests that you can unload and load a service
and that the service objects are removed on unloading.
(evs doesn't unload - next patch)
-Angus
Signed-off-by: Angus Salkeld <asalkeld at redhat.com>
---
cts/corotests.py | 81 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
1 files changed, 80 insertions(+), 1 deletions(-)
diff --git a/cts/corotests.py b/cts/corotests.py
index 6c53a6c..d07eb5f 100644
--- a/cts/corotests.py
+++ b/cts/corotests.py
@@ -180,7 +180,7 @@ class CpgCfgChgOnExecCrash(CpgConfigChangeBase):
def failure_action(self):
self.CM.log("sending SIGSEGV to corosync on " + self.wobbly)
- self.CM.rsh(self.wobbly, "killall -SIGSEGV corosync")
+ self.CM.rsh(self.wobbly, "killall -9 corosync")
self.CM.rsh(self.wobbly, "rm -f /var/run/corosync.pid")
def __call__(self, node):
@@ -428,9 +428,88 @@ class MemLeakSession(CoroTest):
else:
return self.failure(str(mem_leaked) + 'kB memory leaked.')
+###################################################################
+class ServiceLoadTest(CoroTest):
+ '''
+ Test loading and unloading of service engines
+ '''
+ def __init__(self, cm):
+ CoroTest.__init__(self, cm)
+ self.name="ServiceLoadTest"
+
+ def is_loaded(self, node):
+ check = 'corosync-objctl runtime.services. | grep evs'
+
+ (res, out) = self.CM.rsh(node, check, stdout=2)
+ if res is 0:
+ return True
+ else:
+ return False
+
+ def service_unload(self, node):
+ # unload evs
+ pats = []
+ pats.append("%s .*Service engine unloaded: corosync extended.*" % node)
+ unloaded = self.create_watch(pats, 60)
+ unloaded.setwatch()
+
+ self.CM.rsh(node, 'corosync-cfgtool -u corosync_evs')
+
+ if not unloaded.lookforall():
+ self.CM.log("Patterns not found: " + repr(unloaded.unmatched))
+ self.error_message = "evs service not unloaded"
+ return False
+
+ if self.is_loaded(node):
+ self.error_message = "evs has been unload, why are it's session objects are still there?"
+ return False
+ return True
+
+ def service_load(self, node):
+ # now reload it.
+ pats = []
+ pats.append("%s .*Service engine loaded.*" % node)
+ loaded = self.create_watch(pats, 60)
+ loaded.setwatch()
+
+ self.CM.rsh(node, 'corosync-cfgtool -l corosync_evs')
+
+ if not loaded.lookforall():
+ self.CM.log("Patterns not found: " + repr(loaded.unmatched))
+ self.error_message = "evs service not unloaded"
+ return False
+
+ return True
+
+
+ def __call__(self, node):
+ self.incr("calls")
+ should_be_loaded = True
+
+ if self.is_loaded(node):
+ ret = self.service_unload(node)
+ should_be_loaded = False
+ else:
+ ret = self.service_load(node)
+ should_be_loaded = True
+
+ if not ret:
+ return self.failure(self.error_message)
+
+ if self.is_loaded(node):
+ ret = self.service_unload(node)
+ else:
+ ret = self.service_load(node)
+
+ if not ret:
+ return self.failure(self.error_message)
+
+ return self.success()
+
AllTestClasses = []
+AllTestClasses.append(ServiceLoadTest)
AllTestClasses.append(CpgMsgOrderBasic)
AllTestClasses.append(CpgMsgOrderThreads)
AllTestClasses.append(CpgMsgOrderSecNss)
--
1.6.6.1
More information about the Openais
mailing list