1 22 package org.jboss.test.cluster.test; 23 24 import java.rmi.RemoteException ; 25 import java.rmi.server.UnicastRemoteObject ; 26 import java.security.SecureRandom ; 27 import java.util.ArrayList ; 28 import java.util.HashMap ; 29 import java.util.Iterator ; 30 import java.util.Vector ; 31 import java.util.List ; 32 import java.util.HashSet ; 33 34 import javax.management.MBeanServer ; 35 import javax.management.MBeanServerFactory ; 36 import javax.management.MBeanServerInvocationHandler ; 37 import javax.management.ObjectName ; 38 import javax.management.Notification ; 39 40 import junit.framework.Test; 41 42 import org.jboss.test.JBossClusteredTestCase; 43 import org.jboss.test.cluster.drm.IReplicants; 44 import org.jboss.test.cluster.drm.MockHAPartition; 45 import org.jboss.ha.framework.interfaces.ClusterNode; 46 import org.jboss.ha.framework.interfaces.DistributedReplicantManager; 47 import org.jboss.ha.framework.interfaces.DistributedReplicantManager.ReplicantListener; 48 import org.jboss.ha.framework.server.DistributedReplicantManagerImpl; 49 import org.jboss.jmx.adaptor.rmi.RMIAdaptor; 50 import org.jboss.jmx.adaptor.rmi.RMIAdaptorExt; 51 import org.jboss.jmx.adaptor.rmi.RMINotificationListener; 52 import org.jboss.logging.Logger; 53 import org.jgroups.stack.IpAddress; 54 55 import EDU.oswego.cs.dl.util.concurrent.Semaphore; 56 57 63 public class DRMTestCase extends JBossClusteredTestCase 64 { 65 static class TestListener extends UnicastRemoteObject 66 implements RMINotificationListener 67 { 68 private static final long serialVersionUID = 1; 69 private Logger log; 70 71 public TestListener(Logger log) throws RemoteException 72 { 73 this.log = log; 74 } 75 public void handleNotification(Notification notification, Object handback) 76 throws RemoteException 77 { 78 log.info("handleNotification, "+notification); 79 } 80 } 81 82 87 static class BlockingListenerThread extends Thread 88 implements DistributedReplicantManager.ReplicantListener 89 { 90 private DistributedReplicantManagerImpl drm; 91 private String nodeName; 92 private boolean add; 93 private boolean blocking; 94 private Exception ex; 95 96 BlockingListenerThread(DistributedReplicantManagerImpl drm, 97 boolean add, 98 String nodeName) 99 { 100 this.drm = drm; 101 this.add =add; 102 this.nodeName = nodeName; 103 drm.registerListener("TEST", this); 104 } 105 106 public void replicantsChanged(String key, List newReplicants, int newReplicantsViewId) 107 { 108 blocking = true; 109 synchronized(lock) 110 { 111 blocking = false; 112 } 113 } 114 115 public void run() 116 { 117 try 118 { 119 if (add) 120 { 121 if (nodeName == null) 122 drm.add("TEST", "local-replicant"); 123 else 124 drm._add("TEST", nodeName, "remote-replicant"); 125 } 126 else 127 { 128 if (nodeName == null) 129 drm.remove("TEST"); 130 else 131 drm._remove("TEST", nodeName); 132 } 133 } 134 catch (Exception e) 135 { 136 ex = e; 137 } 138 } 139 140 public boolean isBlocking() 141 { 142 return blocking; 143 } 144 145 public Exception getException() 146 { 147 return ex; 148 } 149 150 } 151 152 155 static class RegistrationThread extends Thread 156 { 157 private DistributedReplicantManager drm; 158 private boolean registered = false; 159 private boolean unregistered = true; 160 161 RegistrationThread(DistributedReplicantManager drm) 162 { 163 this.drm = drm; 164 } 165 166 public void run() 167 { 168 NullListener listener = new NullListener(); 169 drm.registerListener("DEADLOCK", listener); 170 registered = true; 171 drm.unregisterListener("DEADLOCK", listener); 172 unregistered = true; 173 } 174 175 public boolean isRegistered() 176 { 177 return registered; 178 } 179 180 public boolean isUnregistered() 181 { 182 return unregistered; 183 } 184 185 } 186 187 190 static class NullListener 191 implements DistributedReplicantManager.ReplicantListener 192 { 193 public void replicantsChanged(String key, List newReplicants, 194 int newReplicantsViewId) 195 { 196 } 198 } 199 200 205 static class MockHASingletonDeployer 206 implements DistributedReplicantManager.ReplicantListener 207 { 208 DistributedReplicantManager drm; 209 MockDeployer deployer; 210 String key; 211 boolean master = false; 212 NullListener deploymentListener = new NullListener(); 213 Exception ex; 214 Logger log; 215 Object mutex = new Object (); 216 217 MockHASingletonDeployer(MockDeployer deployer, String key, Logger log) 218 { 219 this.drm = deployer.getDRM(); 220 this.deployer = deployer; 221 this.key = key; 222 this.log = log; 223 } 224 225 public void replicantsChanged(String key, 226 List newReplicants, 227 int newReplicantsViewId) 228 { 229 if (this.key.equals(key)) 230 { 231 synchronized(mutex) 232 { 233 boolean nowMaster = drm.isMasterReplica(key); 234 235 try 236 { 237 if (!master && nowMaster) { 238 log.debug(Thread.currentThread().getName() + 239 " Deploying " + key); 240 deployer.deploy(key + "A", key, deploymentListener); 241 } 242 else if (master && !nowMaster) { 243 log.debug(Thread.currentThread().getName() + 244 " undeploying " + key); 245 deployer.undeploy(key + "A", deploymentListener); 246 } 247 else 248 { 249 log.debug(Thread.currentThread().getName() + 250 " -- no status change in " + key + 251 " -- master = " + master); 252 } 253 master = nowMaster; 254 } 255 catch (Exception e) 256 { 257 e.printStackTrace(); 258 if (ex == null) 259 ex = e; 260 } 261 } 262 } 263 } 264 265 public Exception getException() 266 { 267 return ex; 268 } 269 270 } 271 272 275 static class DeployerThread extends Thread 276 { 277 Semaphore semaphore; 278 MockDeployer deployer; 279 DistributedReplicantManager.ReplicantListener listener; 280 String key; 281 Exception ex; 282 int count = -1; 283 Logger log; 284 285 DeployerThread(MockDeployer deployer, 286 String key, 287 DistributedReplicantManager.ReplicantListener listener, 288 Semaphore semaphore, 289 Logger log) 290 { 291 super("Deployer " + key); 292 this.deployer = deployer; 293 this.listener = listener; 294 this.key = key; 295 this.semaphore = semaphore; 296 this.log = log; 297 } 298 299 public void run() 300 { 301 boolean acquired = false; 302 try 303 { 304 acquired = semaphore.attempt(60000); 305 if (!acquired) 306 throw new Exception ("Cannot acquire semaphore"); 307 SecureRandom random = new SecureRandom (); 308 for (count = 0; count < LOOP_COUNT; count++) 309 { 310 deployer.deploy(key, "JGroups", listener); 311 312 sleepThread(random.nextInt(50)); 313 deployer.undeploy(key, listener); 314 } 315 } 316 catch (Exception e) 317 { 318 e.printStackTrace(); 319 ex = e; 320 } 321 finally 322 { 323 if (acquired) 324 semaphore.release(); 325 } 326 } 327 328 public Exception getException() 329 { 330 return ex; 331 } 332 333 public int getCount() 334 { 335 return count; 336 } 337 } 338 339 344 static class JGroupsThread extends Thread 345 { 346 Semaphore semaphore; 347 DistributedReplicantManagerImpl drm; 348 String [] keys; 349 String nodeName; 350 Exception ex; 351 int count = -1; 352 int weightFactor; 353 354 JGroupsThread(DistributedReplicantManagerImpl drm, 355 String [] keys, 356 String nodeName, 357 Semaphore semaphore) 358 { 359 super("JGroups"); 360 this.drm = drm; 361 this.keys = keys; 362 this.semaphore = semaphore; 363 this.nodeName = nodeName; 364 this.weightFactor = (int) 2.5 * keys.length; 365 } 366 367 public void run() 368 { 369 boolean acquired = false; 370 try 371 { 372 acquired = semaphore.attempt(60000); 373 if (!acquired) 374 throw new Exception ("Cannot acquire semaphore"); 375 boolean[] added = new boolean[keys.length]; 376 SecureRandom random = new SecureRandom (); 377 378 for (count = 0; count < weightFactor * LOOP_COUNT; count++) 379 { 380 int pos = random.nextInt(keys.length); 381 if (added[pos]) 382 { 383 drm._remove(keys[pos], nodeName); 384 added[pos] = false; 385 } 386 else 387 { 388 drm._add(keys[pos], nodeName, ""); 389 added[pos] = true; 390 } 391 sleepThread(random.nextInt(30)); 392 } 393 } 394 catch (Exception e) 395 { 396 e.printStackTrace(); 397 ex = e; 398 } 399 finally 400 { 401 if (acquired) 402 semaphore.release(); 403 } 404 } 405 406 public Exception getException() 407 { 408 return ex; 409 } 410 411 public int getCount() 412 { 413 return (count / weightFactor); 414 } 415 416 } 417 418 423 static class MockDeployer 424 { 425 DistributedReplicantManager drm; 426 427 MockDeployer(DistributedReplicantManager drm) 428 { 429 this.drm = drm; 430 } 431 432 void deploy(String key, String replicant, 433 DistributedReplicantManager.ReplicantListener listener) 434 throws Exception 435 { 436 synchronized(this) 437 { 438 drm.registerListener(key, listener); 439 drm.add(key, replicant); 440 sleepThread(10); 441 } 442 } 443 444 void undeploy(String key, 445 DistributedReplicantManager.ReplicantListener listener) 446 throws Exception 447 { 448 synchronized(this) 449 { 450 drm.remove(key); 451 drm.unregisterListener(key, listener); 452 sleepThread(10); 453 } 454 } 455 456 DistributedReplicantManager getDRM() 457 { 458 return drm; 459 } 460 } 461 462 463 static class CachingListener implements ReplicantListener 464 { 465 List replicants = null; 466 boolean clean = true; 467 468 public void replicantsChanged(String key, List newReplicants, 469 int newReplicantsViewId) 470 { 471 this.replicants = newReplicants; 472 if (clean && newReplicants != null) 473 { 474 int last = Integer.MIN_VALUE; 475 for (Iterator iter = newReplicants.iterator(); iter.hasNext(); ) 476 { 477 int cur = ((Integer ) iter.next()).intValue(); 478 if (last >= cur) 479 { 480 clean = false; 481 break; 482 } 483 484 last = cur; 485 } 486 } 487 } 488 489 } 490 491 private static Object lock = new Object (); 492 private static int LOOP_COUNT = 30; 493 494 public static Test suite() throws Exception 495 { 496 Test t1 = getDeploySetup(DRMTestCase.class, "drm-tests.sar"); 497 return t1; 498 } 499 500 public DRMTestCase(String name) 501 { 502 super(name); 503 } 504 505 public void testStateReplication() 506 throws Exception 507 { 508 log.debug("+++ testStateReplication"); 509 log.info("java.rmi.server.hostname="+System.getProperty("java.rmi.server.hostname")); 510 RMIAdaptor[] adaptors = getAdaptors(); 511 String [] servers = super.getServers(); 512 RMIAdaptorExt server0 = (RMIAdaptorExt) adaptors[0]; 513 log.info("server0: "+server0); 514 ObjectName clusterService = new ObjectName ("jboss:service=DefaultPartition"); 515 Vector view0 = (Vector ) server0.getAttribute(clusterService, "CurrentView"); 516 log.info("server0: CurrentView, "+view0); 517 ObjectName drmService = new ObjectName ("jboss.test:service=DRMTestCase"); 518 IReplicants drm0 = (IReplicants) 519 MBeanServerInvocationHandler.newProxyInstance(server0, drmService, 520 IReplicants.class, true); 521 log.info(MBeanServerInvocationHandler .class.getProtectionDomain()); 522 TestListener listener = new TestListener(log); 523 server0.addNotificationListener(drmService, listener, null, null); 524 log.info("server0 addNotificationListener"); 525 String address = (String ) drm0.lookupLocalReplicant(); 526 log.info("server0: lookupLocalReplicant: "+address); 527 assertTrue("server0: address("+address+") == server0("+servers[0]+")", 528 address.equals(servers[0])); 529 530 RMIAdaptorExt server1 = (RMIAdaptorExt) adaptors[1]; 531 log.info("server1: "+server1); 532 Vector view1 = (Vector ) server1.getAttribute(clusterService, "CurrentView"); 533 log.info("server1: CurrentView, "+view1); 534 IReplicants drm1 = (IReplicants) 535 MBeanServerInvocationHandler.newProxyInstance(server1, drmService, 536 IReplicants.class, true); 537 server1.addNotificationListener(drmService, listener, null, null); 538 log.info("server1 addNotificationListener"); 539 address = (String ) drm1.lookupLocalReplicant(); 540 log.info("server1: lookupLocalReplicant: "+address); 541 assertTrue("server1: address("+address+") == server1("+servers[1]+")", 542 address.equals(servers[1])); 543 544 List replicants0 = drm0.lookupReplicants(); 545 List replicants1 = drm1.lookupReplicants(); 546 assertTrue("size of replicants0 == replicants1)", 547 replicants0.size() == replicants1.size()); 548 HashSet testSet = new HashSet (replicants0); 549 for(int n = 0; n < replicants0.size(); n ++) 550 { 551 Object entry = replicants1.get(n); 552 assertTrue("replicants0 contains:"+entry, testSet.contains(entry)); 553 } 554 555 for(int n = 0; n < 10; n ++) 557 { 558 drm0.add("key"+n, "data"+n+".0"); 559 drm1.add("key"+n, "data"+n+".1"); 560 } 561 for(int n = 0; n < 10; n ++) 562 { 563 String key = "key"+n; 564 log.info("key: "+key); 565 replicants0 = drm0.lookupReplicants(key); 566 replicants1 = drm1.lookupReplicants(key); 567 log.info("replicants0: "+replicants0); 568 log.info("replicants1: "+replicants1); 569 HashSet testSet0 = new HashSet (replicants0); 570 HashSet testSet1 = new HashSet (replicants1); 571 assertTrue("size of replicants0 == replicants1)", 572 replicants0.size() == replicants1.size()); 573 Object entry = drm0.lookupLocalReplicant(key); 574 log.info("drm0.lookupLocalReplicant, key="+key+", entry="+entry); 575 assertTrue("replicants0 contains:"+entry, testSet0.contains(entry)); 576 assertTrue("replicants1 contains:"+entry, testSet1.contains(entry)); 577 } 578 579 for(int n = 0; n < 10; n ++) 580 drm0.remove("key"+n); 581 582 server0.removeNotificationListener(drmService, listener); 583 server1.removeNotificationListener(drmService, listener); 584 } 585 586 595 public void testIsMasterReplica() throws Exception 596 { 597 log.debug("+++ testIsMasterReplica()"); 598 599 MBeanServer mbeanServer = 600 MBeanServerFactory.createMBeanServer("mockPartition"); 601 try { 602 ClusterNode localAddress = new ClusterNode(new IpAddress("127.0.0.1", 12345)); 603 MockHAPartition partition = new MockHAPartition(localAddress); 604 605 DistributedReplicantManagerImpl drm = 606 new DistributedReplicantManagerImpl(partition); 607 608 drm.create(); 609 610 612 Vector remoteAddresses = new Vector (); 613 for (int i = 1; i < 5; i++) 614 remoteAddresses.add(new ClusterNode(new IpAddress("127.0.0.1", 12340 + i))); 615 616 Vector allNodes = new Vector (remoteAddresses); 617 allNodes.add(localAddress); 618 partition.setCurrentViewClusterNodes(allNodes); 619 620 622 HashMap replicants = new HashMap (); 623 ArrayList remoteResponses = new ArrayList (); 624 for (int i = 0; i < remoteAddresses.size(); i++) 625 { 626 ClusterNode node = (ClusterNode) remoteAddresses.elementAt(i); 627 Integer replicant = new Integer (i + 1); 628 replicants.put(node.getName(), replicant); 629 HashMap localReplicant = new HashMap (); 630 localReplicant.put("Mock", replicant); 631 remoteResponses.add(new Object [] {node.getName(), localReplicant}); 632 } 633 HashMap services = new HashMap (); 634 services.put("Mock", replicants); 635 636 int hash = 0; 637 for (int i = 1; i < 5; i++) 638 hash += (new Integer (i)).hashCode(); 639 640 HashMap intraviewIds = new HashMap (); 641 intraviewIds.put("Mock", new Integer (hash)); 642 643 partition.setRemoteReplicants(remoteResponses); 644 645 drm.setCurrentState(new Object [] {services, intraviewIds }); 646 647 drm.start(); 648 649 651 drm.add("Mock", new Integer (5)); 652 653 655 assertFalse("Local node is not master after startup", 656 drm.isMasterReplica("Mock")); 657 658 660 Vector localOnly = new Vector (); 661 localOnly.add(localAddress); 662 663 partition.setCurrentViewClusterNodes(localOnly); 664 partition.setRemoteReplicants(new ArrayList ()); 665 666 drm.membershipChanged(remoteAddresses, new Vector (), localOnly); 667 668 670 assertTrue("Local node is master after split", drm.isMasterReplica("Mock")); 671 672 674 drm.remove("Mock"); 675 676 678 assertFalse("Local node is not master after dropping replicant", 679 drm.isMasterReplica("Mock")); 680 681 683 drm.add("Mock", new Integer (5)); 684 685 687 Vector mergeGroups = new Vector (); 688 mergeGroups.add(remoteAddresses); 689 mergeGroups.add(localOnly); 690 691 partition.setCurrentViewClusterNodes(allNodes); 692 partition.setRemoteReplicants(remoteResponses); 693 694 drm.membershipChangedDuringMerge(new Vector (), remoteAddresses, 695 allNodes, mergeGroups); 696 697 sleepThread(100); 699 700 702 assertFalse("Local node is not master after merge", 703 drm.isMasterReplica("Mock")); 704 } 705 finally { 706 MBeanServerFactory.releaseMBeanServer(mbeanServer); 707 } 708 } 709 710 711 720 public void testKeyListenerDeadlock() throws Exception 721 { 722 log.debug("+++ testKeyListenerDeadlock()"); 723 724 MBeanServer mbeanServer = 725 MBeanServerFactory.createMBeanServer("mockPartition"); 726 try { 727 ClusterNode localAddress = new ClusterNode(new IpAddress("127.0.0.1", 12345)); 728 MockHAPartition partition = new MockHAPartition(localAddress); 729 730 DistributedReplicantManagerImpl drm = 731 new DistributedReplicantManagerImpl(partition); 732 733 drm.create(); 734 735 737 Vector remoteAddresses = new Vector (); 738 for (int i = 1; i < 5; i++) 739 remoteAddresses.add(new ClusterNode(new IpAddress("127.0.0.1", 12340 + i))); 740 741 Vector allNodes = new Vector (remoteAddresses); 742 allNodes.add(localAddress); 743 partition.setCurrentViewClusterNodes(allNodes); 744 745 drm.start(); 746 747 BlockingListenerThread blt = 748 new BlockingListenerThread(drm, true, null); 749 750 synchronized(lock) { 753 blt.start(); 756 757 sleepThread(50); 758 759 assertTrue("Test thread is alive", blt.isAlive()); 760 assertTrue("Test thread is blocking", blt.isBlocking()); 761 762 RegistrationThread rt = new RegistrationThread(drm); 763 rt.start(); 764 765 sleepThread(50); 766 767 assertTrue("No deadlock on listener registration", rt.isRegistered()); 768 769 assertTrue("No deadlock on listener unregistration", rt.isUnregistered()); 770 771 assertNull("No exception in deadlock tester", blt.getException()); 772 773 assertTrue("Test thread is still blocking", blt.isBlocking()); 774 assertTrue("Test thread is still alive", blt.isAlive()); 775 } 776 777 drm.unregisterListener("TEST", blt); 778 779 sleepThread(50); 780 781 blt = new BlockingListenerThread(drm, false, null); 783 784 synchronized(lock) { 787 blt.start(); 790 791 sleepThread(50); 792 793 assertTrue("Test thread is alive", blt.isAlive()); 794 assertTrue("Test thread is blocking", blt.isBlocking()); 795 796 RegistrationThread rt = new RegistrationThread(drm); 797 rt.start(); 798 799 sleepThread(50); 800 801 assertTrue("No deadlock on listener registration", rt.isRegistered()); 802 803 assertTrue("No deadlock on listener unregistration", rt.isUnregistered()); 804 805 assertNull("No exception in deadlock tester", blt.getException()); 806 807 assertTrue("Test thread is still blocking", blt.isBlocking()); 808 assertTrue("Test thread is still alive", blt.isAlive()); 809 } 810 } 811 finally { 812 MBeanServerFactory.releaseMBeanServer(mbeanServer); 813 } 814 } 815 816 817 825 public void testRemoteCallBlocking() throws Exception 826 { 827 log.debug("+++ testRemoteCallBlocking()"); 828 829 MBeanServer mbeanServer = 830 MBeanServerFactory.createMBeanServer("mockPartition"); 831 try { 832 ClusterNode localAddress = new ClusterNode(new IpAddress("127.0.0.1", 12345)); 833 MockHAPartition partition = new MockHAPartition(localAddress); 834 835 DistributedReplicantManagerImpl drm = 836 new DistributedReplicantManagerImpl(partition); 837 838 drm.create(); 839 840 842 Vector remoteAddresses = new Vector (); 843 for (int i = 1; i < 5; i++) 844 remoteAddresses.add(new ClusterNode(new IpAddress("127.0.0.1", 12340 + i))); 845 846 Vector allNodes = new Vector (remoteAddresses); 847 allNodes.add(localAddress); 848 partition.setCurrentViewClusterNodes(allNodes); 849 850 drm.start(); 851 852 String sender = ((ClusterNode)remoteAddresses.get(0)).getName(); 853 BlockingListenerThread blt = 854 new BlockingListenerThread(drm, true, sender); 855 856 synchronized(lock) { 859 blt.start(); 862 863 sleepThread(50); 864 865 assertFalse("JGroups thread is not alive", blt.isAlive()); 866 assertTrue("Async handler thread is blocking", blt.isBlocking()); 867 868 assertNull("No exception in JGroups thread", blt.getException()); 869 } 870 871 drm.unregisterListener("TEST", blt); 872 873 sleepThread(50); 874 875 blt = new BlockingListenerThread(drm, false, sender); 877 878 synchronized(lock) { 881 blt.start(); 884 885 sleepThread(50); 886 887 assertFalse("JGroups thread is not alive", blt.isAlive()); 888 assertTrue("Async handler thread is blocking", blt.isBlocking()); 889 890 assertNull("No exception in JGroups thread", blt.getException()); 891 } 892 } 893 finally { 894 MBeanServerFactory.releaseMBeanServer(mbeanServer); 895 } 896 } 897 898 908 public void testNonConflictingAddRemoveDeadlock() throws Exception 909 { 910 911 log.debug("+++ testNonConflictingAddRemoveDeadlock()"); 912 913 addRemoveDeadlockTest(false); 914 } 915 916 936 public void badtestConflictingAddRemoveDeadlock() throws Exception 937 { 938 log.debug("+++ testConflictingAddRemoveDeadlock()"); 939 940 addRemoveDeadlockTest(true); 941 } 942 943 private void addRemoveDeadlockTest(boolean conflicting) throws Exception 944 { 945 String [] keys = { "A", "B", "C", "D", "E" }; 946 int count = keys.length; 947 948 MBeanServer mbeanServer = 949 MBeanServerFactory.createMBeanServer("mockPartition"); 950 try { 951 ClusterNode localAddress = new ClusterNode(new IpAddress("127.0.0.1", 12345)); 952 MockHAPartition partition = new MockHAPartition(localAddress); 953 954 DistributedReplicantManagerImpl drm = 955 new DistributedReplicantManagerImpl(partition); 956 957 drm.create(); 958 959 961 Vector remoteAddresses = new Vector (); 962 ClusterNode remote = new ClusterNode(new IpAddress("127.0.0.1", 12341)); 963 remoteAddresses.add(remote); 964 965 Vector allNodes = new Vector (remoteAddresses); 966 allNodes.add(localAddress); 967 partition.setCurrentViewClusterNodes(allNodes); 968 969 drm.start(); 970 971 MockDeployer deployer = new MockDeployer(drm); 972 973 if (!conflicting) 974 { 975 MockHASingletonDeployer listener = 978 new MockHASingletonDeployer(deployer, "HASingleton", log); 979 980 drm.registerListener("HASingleton", listener); 981 drm.add("HASingleton", "HASingleton"); 982 } 983 984 Semaphore semaphore = new Semaphore(count + 1); 986 for (int i = 0; i <= count; i++) 987 semaphore.acquire(); 988 989 DeployerThread[] deployers = new DeployerThread[keys.length]; 990 for (int i = 0; i < count; i++) 991 { 992 DistributedReplicantManager.ReplicantListener listener = null; 993 if (conflicting) 994 { 995 listener = new MockHASingletonDeployer(deployer, keys[i], log); 996 } 997 else 998 { 999 listener = new NullListener(); 1000 } 1001 deployers[i] = new DeployerThread(deployer, keys[i], listener, semaphore, log); 1002 deployers[i].start(); 1003 } 1004 1005 String [] jgKeys = keys; 1006 if (!conflicting) 1007 { 1008 jgKeys = new String [keys.length + 1]; 1011 System.arraycopy(keys, 0, jgKeys, 0, keys.length); 1012 jgKeys[keys.length] = "HASingleton"; 1013 } 1014 JGroupsThread jgThread = new JGroupsThread(drm, jgKeys, remote.getName(), semaphore); 1015 jgThread.start(); 1016 1017 semaphore.release(count + 1); 1019 1020 boolean reacquired = false; 1021 try 1022 { 1023 long maxElapsed = System.currentTimeMillis() + 5000; 1025 for (int i = 0; i < keys.length; i++) 1026 { 1027 if (deployers[i].getCount() < 0) 1028 { 1029 assertTrue("Thread " + keys[i] + " started in time", 1030 maxElapsed - System.currentTimeMillis() > 0); 1031 sleepThread(10); 1032 i--; } 1034 } 1035 1036 while (jgThread.getCount() < 0) 1037 { 1038 assertTrue("jgThread started in time", 1039 maxElapsed - System.currentTimeMillis() > 0); 1040 sleepThread(10); 1041 } 1042 1044 maxElapsed = System.currentTimeMillis() + (500 * LOOP_COUNT); 1046 for (int i = 0; i <= count; i++) 1047 { 1048 long waitTime = maxElapsed - System.currentTimeMillis(); 1049 assertTrue("Acquired thread " + i, semaphore.attempt(waitTime)); 1050 } 1051 1052 reacquired = true; 1053 1054 for (int i = 0; i < keys.length; i++) 1056 { 1057 assertEquals("Thread " + keys[i] + " finished", LOOP_COUNT, deployers[i].getCount()); 1058 assertNull("Thread " + keys[i] + " saw no exceptions", deployers[i].getException()); 1059 } 1060 assertEquals("JGroups Thread finished", LOOP_COUNT, jgThread.getCount()); 1061 assertNull("JGroups Thread saw no exceptions", jgThread.getException()); 1062 } 1063 finally 1064 { 1065 1066 if (!reacquired) 1067 { 1068 for (int i = 0; i < keys.length; i++) 1069 { 1070 if (deployers[i].getException() != null) 1071 { 1072 System.out.println("Exception in deployer " + i); 1073 deployers[i].getException().printStackTrace(System.out); 1074 } 1075 else 1076 { 1077 System.out.println("Thread " + i + " completed " + deployers[i].getCount()); 1078 } 1079 } 1080 if (jgThread.getException() != null) 1081 { 1082 System.out.println("Exception in jgThread"); 1083 jgThread.getException().printStackTrace(System.out); 1084 } 1085 else 1086 { 1087 System.out.println("jgThread completed " + jgThread.getCount()); 1088 } 1089 } 1090 1091 if (jgThread.isAlive()) 1093 { 1094 jgThread.interrupt(); 1095 sleepThread(5); 1096 printStackTrace(jgThread.getName(), jgThread.getException()); 1097 } 1098 for (int i = 0; i < keys.length; i++) 1099 { 1100 if (deployers[i].isAlive()) 1101 { 1102 deployers[i].interrupt(); 1103 sleepThread(5); 1104 printStackTrace(deployers[i].getName(), deployers[i].getException()); 1105 } 1106 } 1107 1108 } 1109 } 1110 finally { 1111 MBeanServerFactory.releaseMBeanServer(mbeanServer); 1112 } 1113 } 1114 1115 public void testReplicantOrder() throws Exception 1116 { 1117 MBeanServer mbeanServer = 1118 MBeanServerFactory.createMBeanServer("mockPartitionA"); 1119 try { 1120 1121 ClusterNode[] nodes = new ClusterNode[5]; 1123 String [] names = new String [nodes.length]; 1124 Integer [] replicants = new Integer [nodes.length]; 1125 Vector allNodes = new Vector (); 1126 for (int i = 0; i < nodes.length; i++) 1127 { 1128 nodes[i] = new ClusterNode(new IpAddress("127.0.0.1", 12340 + i)); 1129 allNodes.add(nodes[i]); 1130 names[i] = nodes[i].getName(); 1131 replicants[i] = new Integer (i); 1132 } 1133 1134 MockHAPartition partition = new MockHAPartition(nodes[2]); 1135 partition.setCurrentViewClusterNodes(allNodes); 1136 1137 DistributedReplicantManagerImpl drm = 1138 new DistributedReplicantManagerImpl(partition); 1139 drm.create(); 1140 drm.start(); 1141 1142 CachingListener listener = new CachingListener(); 1143 drm.registerListener("TEST", listener); 1144 1145 SecureRandom random = new SecureRandom (); 1146 boolean[] added = new boolean[nodes.length]; 1147 List lookup = null; 1148 for (int i = 0; i < 10; i++) 1149 { 1150 int node = random.nextInt(nodes.length); 1151 if (added[node]) 1152 { 1153 if (node == 2) 1154 drm.remove("TEST"); 1155 else 1156 drm._remove("TEST", nodes[node].getName()); 1157 added[node] = false; 1158 } 1159 else 1160 { 1161 if (node == 2) 1162 drm.add("TEST", replicants[node]); 1163 else 1164 drm._add("TEST", nodes[node].getName(), replicants[node]); 1165 added[node] = true; 1166 } 1167 1168 lookup = maskListClass(drm.lookupReplicantsNodeNames("TEST")); 1170 confirmReplicantList(lookup, names, added); 1171 1172 lookup = maskListClass(drm.lookupReplicants("TEST")); 1174 confirmReplicantList(lookup, replicants, added); 1175 1176 } 1180 1181 sleep(25); 1183 1184 assertTrue("Listener saw no misordered lists", listener.clean); 1186 1187 } 1188 finally { 1189 MBeanServerFactory.releaseMBeanServer(mbeanServer); 1190 } 1191 } 1192 1193 private void confirmReplicantList(List current, Object [] all, boolean[] added) 1194 { 1195 Iterator iter = current.iterator(); 1196 for (int i = 0; i < added.length; i++) 1197 { 1198 if (added[i]) 1199 { 1200 assertTrue("List has more replicants", iter.hasNext()); 1201 assertEquals("Replicant for node " + i + " is next", 1202 all[i], iter.next()); 1203 } 1204 } 1205 assertFalse("List has no extra replicants", iter.hasNext()); 1206 } 1207 1208 1209 private List maskListClass(List toMask) 1210 { 1211 if (toMask instanceof ArrayList ) 1212 return toMask; 1213 else if (toMask == null) 1214 return new ArrayList (); 1215 else 1216 return new ArrayList (toMask); 1217 } 1218 1219 private static void sleepThread(long millis) 1220 { 1221 try 1222 { 1223 Thread.sleep(millis); 1224 } 1225 catch (InterruptedException e) { 1226 e.printStackTrace(); 1227 } 1228 } 1229 1230 private static void printStackTrace(String threadName, Exception e) 1231 { 1232 if (e instanceof InterruptedException ) 1233 { 1234 System.out.println("Stack trace for " + threadName); 1235 e.printStackTrace(System.out); 1236 System.out.println(); 1237 } 1238 } 1239 1240} 1241 | Popular Tags |