jdk/test/javax/management/eventService/ReconnectableConnectorTest.java
changeset 4156 acaa49a2768a
parent 4155 460e37d40f12
child 4159 9e3aae7675f1
equal deleted inserted replaced
4155:460e37d40f12 4156:acaa49a2768a
     1 /*
       
     2  * Copyright 2007-2008 Sun Microsystems, Inc.  All Rights Reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
       
    20  * CA 95054 USA or visit www.sun.com if you need additional information or
       
    21  * have any questions.
       
    22  */
       
    23 
       
    24 /*
       
    25  * @test ReconnectableJMXConnector
       
    26  * @bug 5108776
       
    27  * @summary Check that the Event Service can be used to build a
       
    28  * ReconnectableJMXConnector.
       
    29  * @author Eamonn McManus
       
    30  */
       
    31 
       
    32 import java.io.IOException;
       
    33 import java.lang.reflect.InvocationHandler;
       
    34 import java.lang.reflect.InvocationTargetException;
       
    35 import java.lang.reflect.Method;
       
    36 import java.lang.reflect.Proxy;
       
    37 import java.util.Date;
       
    38 import java.util.Map;
       
    39 import java.util.NoSuchElementException;
       
    40 import java.util.concurrent.ArrayBlockingQueue;
       
    41 import java.util.concurrent.BlockingQueue;
       
    42 import java.util.concurrent.TimeUnit;
       
    43 import java.util.concurrent.atomic.AtomicLong;
       
    44 import java.util.concurrent.atomic.AtomicReference;
       
    45 import java.util.concurrent.locks.Condition;
       
    46 import java.util.concurrent.locks.Lock;
       
    47 import java.util.concurrent.locks.ReentrantLock;
       
    48 import javax.management.ListenerNotFoundException;
       
    49 import javax.management.MBeanServer;
       
    50 import javax.management.MBeanServerConnection;
       
    51 import javax.management.MBeanServerFactory;
       
    52 import javax.management.Notification;
       
    53 import javax.management.NotificationBroadcasterSupport;
       
    54 import javax.management.NotificationFilter;
       
    55 import javax.management.NotificationListener;
       
    56 import javax.management.ObjectName;
       
    57 import javax.management.event.EventClient;
       
    58 import javax.management.remote.JMXConnectionNotification;
       
    59 import javax.management.remote.JMXConnector;
       
    60 import javax.management.remote.JMXConnectorFactory;
       
    61 import javax.management.remote.JMXConnectorServer;
       
    62 import javax.management.remote.JMXConnectorServerFactory;
       
    63 import javax.management.remote.JMXServiceURL;
       
    64 import javax.security.auth.Subject;
       
    65 
       
    66 /*
       
    67  * This test checks that it is possible to use the Event Service to create
       
    68  * a "reconnectable connector".
       
    69  *
       
    70  * In the JMX Remote API, we deliberately specified that a connector client
       
    71  * (JMXConnector) that encounters a network failure is then permanently broken.
       
    72  * The idea being that adding recovery logic to the basic connector client
       
    73  * would make it much more complicated and less reliable, and the logic would
       
    74  * in any case never correspond to what a given situation needs. Some of
       
    75  * the tough questions are: Should the connector try to mask the failure by
       
    76  * blocking operations until the failure is resolved? How long should the
       
    77  * connector try to reestablish the connection before giving up? Rather than
       
    78  * try to solve this problem in the connector, we suggested that people who
       
    79  * wanted to recover from network failures could implement the JMXConnector
       
    80  * interface themselves so that it forwards to a wrapped JMXConnector that can
       
    81  * be replaced in case of network failure.
       
    82  *
       
    83  * This works fine except that the connector client has state,
       
    84  * in the form of listeners added by the user through the
       
    85  * MBeanServerConnection.addNotificationListener method. It's possible
       
    86  * for the wrapper to keep track of these listeners as well as forwarding
       
    87  * them to the wrapped JMXConnector, so that it can reapply them to
       
    88  * a replacement JMXConnector after failure recover. But it's quite
       
    89  * tricky, particularly because of the two- and four-argument versions of
       
    90  * removeNotificationListener.
       
    91  *
       
    92  * The Event Service can take care of this for you through the EventClient
       
    93  * class. Listeners added through that class are implemented in a way that
       
    94  * doesn't require the connector client to maintain any state, so they should
       
    95  * continue to work transparently after replacing the wrapped JMXConnector.
       
    96  * This test is a proof of concept that shows it works.  Quite a number of
       
    97  * details would need to be changed to build a reliable reconnectable
       
    98  * connector.
       
    99  *
       
   100  * The test simulates network failure by rewrapping the wrapped JMXConnector's
       
   101  * MBeanServerConnection (MBSC) in a "breakable" MBSC which we can cause
       
   102  * to stop working.  We do this in two phases.  The first phase suspends
       
   103  * any MBSC calls just at the point where they would return to the caller.
       
   104  * The goal here is to block an EventClientDelegateMBean.fetchNotifications
       
   105  * operation when it has received notifications but not yet delivered them
       
   106  * to the EventClient.  This is the most delicate point where a breakage
       
   107  * can occur, because the EventClientDelegate must not drop those notifs
       
   108  * from its buffer until another fetchNotifs call arrives with a later
       
   109  * sequence number (which is an implicit ack of the previous set of
       
   110  * notifs).  Once the fetchNotifs call is suspended, we "kill" the MBSC,
       
   111  * causing it to throw IOException from this and any other calls.  That
       
   112  * triggers the reconnect logic, which will make a new MBSC and issue
       
   113  * the same fetchNotifs call to it.
       
   114  *
       
   115  * The test could be improved by synchronizing explicitly between the
       
   116  * breakable MBSC and the mainline, so we only proceed to kill the MBSC
       
   117  * when we are sure that the fetchNotifs call is blocked.  As it is,
       
   118  * we have a small delay which both ensures that no notifs are delivered
       
   119  * while the connection is suspended, and if the machine is fast enough
       
   120  * allows the fetchNotifs call to reach the blocking point.
       
   121  */
       
   122 public class ReconnectableConnectorTest {
       
   123     private static class ReconnectableJMXConnector implements JMXConnector {
       
   124         private final JMXServiceURL url;
       
   125         private AtomicReference<JMXConnector> wrappedJMXC =
       
   126                 new AtomicReference<JMXConnector>();
       
   127         private AtomicReference<MBeanServerConnection> wrappedMBSC =
       
   128                 new AtomicReference<MBeanServerConnection>();
       
   129         private final NotificationBroadcasterSupport broadcaster =
       
   130                 new NotificationBroadcasterSupport();
       
   131         private final Lock connectLock = new ReentrantLock();
       
   132 
       
   133         ReconnectableJMXConnector(JMXServiceURL url) {
       
   134             this.url = url;
       
   135         }
       
   136 
       
   137         private class ReconnectIH implements InvocationHandler {
       
   138             public Object invoke(Object proxy, Method method, Object[] args)
       
   139                     throws Throwable {
       
   140                 try {
       
   141                     return method.invoke(wrappedMBSC.get(), args);
       
   142                 } catch (InvocationTargetException e) {
       
   143                     if (e.getCause() instanceof IOException) {
       
   144                         connect();
       
   145                         try {
       
   146                             return method.invoke(wrappedMBSC.get(),args);
       
   147                         } catch (InvocationTargetException ee) {
       
   148                             throw ee.getCause();
       
   149                         }
       
   150                     }
       
   151                     throw e.getCause();
       
   152                 }
       
   153             }
       
   154         }
       
   155 
       
   156         private class FailureListener implements NotificationListener {
       
   157             public void handleNotification(Notification n, Object h) {
       
   158                 String type = n.getType();
       
   159                 if (type.equals(JMXConnectionNotification.FAILED)) {
       
   160                     try {
       
   161                         connect();
       
   162                     } catch (IOException e) {
       
   163                         broadcaster.sendNotification(n);
       
   164                     }
       
   165                 } else if (type.equals(JMXConnectionNotification.NOTIFS_LOST))
       
   166                     broadcaster.sendNotification(n);
       
   167             }
       
   168         }
       
   169 
       
   170         public void connect() throws IOException {
       
   171             connectLock.lock();
       
   172             try {
       
   173                 connectWithLock();
       
   174             } finally {
       
   175                 connectLock.unlock();
       
   176             }
       
   177         }
       
   178 
       
   179         private void connectWithLock() throws IOException {
       
   180             MBeanServerConnection mbsc = wrappedMBSC.get();
       
   181             if (mbsc != null) {
       
   182                 try {
       
   183                     mbsc.getDefaultDomain();
       
   184                     return;  // the connection works
       
   185                 } catch (IOException e) {
       
   186                     // OK: the connection doesn't work, so make a new one
       
   187                 }
       
   188             }
       
   189             // This is where we would need to add the fancy logic that
       
   190             // allows the connection to keep failing for a while
       
   191             // before giving up.
       
   192             JMXConnector jmxc = JMXConnectorFactory.connect(url);
       
   193             jmxc.addConnectionNotificationListener(
       
   194                     new FailureListener(), null, null);
       
   195             wrappedJMXC.set(jmxc);
       
   196             if (false)
       
   197                 wrappedMBSC.set(jmxc.getMBeanServerConnection());
       
   198             else {
       
   199                 mbsc = jmxc.getMBeanServerConnection();
       
   200                 InvocationHandler ih = new BreakableIH(mbsc);
       
   201                 mbsc = (MBeanServerConnection) Proxy.newProxyInstance(
       
   202                         MBeanServerConnection.class.getClassLoader(),
       
   203                         new Class<?>[] {MBeanServerConnection.class},
       
   204                         ih);
       
   205                 wrappedMBSC.set(mbsc);
       
   206             }
       
   207         }
       
   208 
       
   209         private BreakableIH breakableIH() {
       
   210             MBeanServerConnection mbsc = wrappedMBSC.get();
       
   211             return (BreakableIH) Proxy.getInvocationHandler(mbsc);
       
   212         }
       
   213 
       
   214         void suspend() {
       
   215             BreakableIH ih = breakableIH();
       
   216             ih.suspend();
       
   217         }
       
   218 
       
   219         void kill() throws IOException {
       
   220             BreakableIH ih = breakableIH();
       
   221             wrappedJMXC.get().close();
       
   222             ih.kill();
       
   223         }
       
   224 
       
   225         public void connect(Map<String, ?> env) throws IOException {
       
   226             throw new UnsupportedOperationException("Not supported yet.");
       
   227         }
       
   228 
       
   229         private final AtomicReference<MBeanServerConnection> mbscRef =
       
   230                 new AtomicReference<MBeanServerConnection>();
       
   231 
       
   232         public MBeanServerConnection getMBeanServerConnection()
       
   233                 throws IOException {
       
   234             connect();
       
   235             // Synchro here is not strictly correct: two threads could make
       
   236             // an MBSC at the same time.  OK for a test but beware for real
       
   237             // code.
       
   238             MBeanServerConnection mbsc = mbscRef.get();
       
   239             if (mbsc != null)
       
   240                 return mbsc;
       
   241             mbsc = (MBeanServerConnection) Proxy.newProxyInstance(
       
   242                     MBeanServerConnection.class.getClassLoader(),
       
   243                     new Class<?>[] {MBeanServerConnection.class},
       
   244                     new ReconnectIH());
       
   245             mbsc = EventClient.getEventClientConnection(mbsc);
       
   246             mbscRef.set(mbsc);
       
   247             return mbsc;
       
   248         }
       
   249 
       
   250         public MBeanServerConnection getMBeanServerConnection(
       
   251                 Subject delegationSubject) throws IOException {
       
   252             throw new UnsupportedOperationException("Not supported yet.");
       
   253         }
       
   254 
       
   255         public void close() throws IOException {
       
   256             wrappedJMXC.get().close();
       
   257         }
       
   258 
       
   259         public void addConnectionNotificationListener(
       
   260                 NotificationListener l, NotificationFilter f, Object h) {
       
   261             broadcaster.addNotificationListener(l, f, h);
       
   262         }
       
   263 
       
   264         public void removeConnectionNotificationListener(NotificationListener l)
       
   265                 throws ListenerNotFoundException {
       
   266             broadcaster.removeNotificationListener(l);
       
   267         }
       
   268 
       
   269         public void removeConnectionNotificationListener(
       
   270                 NotificationListener l, NotificationFilter f, Object h)
       
   271                 throws ListenerNotFoundException {
       
   272             broadcaster.removeNotificationListener(l, f, h);
       
   273         }
       
   274 
       
   275         public String getConnectionId() throws IOException {
       
   276             return wrappedJMXC.get().getConnectionId();
       
   277         }
       
   278     }
       
   279 
       
   280     // InvocationHandler that allows us to perform a two-phase "break" of
       
   281     // an object.  The first phase suspends the object, so that calls to
       
   282     // it are blocked just before they return.  The second phase unblocks
       
   283     // suspended threads and causes them to throw IOException.
       
   284     private static class BreakableIH implements InvocationHandler {
       
   285         private final Object wrapped;
       
   286         private final Holder<String> state = new Holder<String>("running");
       
   287 
       
   288         BreakableIH(Object wrapped) {
       
   289             this.wrapped = wrapped;
       
   290         }
       
   291 
       
   292         void suspend() {
       
   293             state.set("suspended");
       
   294         }
       
   295 
       
   296         void kill() {
       
   297             state.set("killed");
       
   298         }
       
   299 
       
   300         public Object invoke(Object proxy, Method method, Object[] args)
       
   301                 throws Throwable {
       
   302             Object result;
       
   303             try {
       
   304                 result = method.invoke(wrapped, args);
       
   305             } catch (InvocationTargetException e) {
       
   306                 throw e.getCause();
       
   307             }
       
   308             String s = state.get();
       
   309             if (s.equals("suspended"))
       
   310                 state.waitUntilEqual("killed", 3, TimeUnit.SECONDS);
       
   311             else if (s.equals("killed"))
       
   312                 throw new IOException("Broken");
       
   313             return result;
       
   314         }
       
   315     }
       
   316 
       
   317     private static class Holder<T> {
       
   318         private T held;
       
   319         private Lock lock = new ReentrantLock();
       
   320         private Condition changed = lock.newCondition();
       
   321 
       
   322         Holder(T value) {
       
   323             lock.lock();
       
   324             this.held = value;
       
   325             lock.unlock();
       
   326         }
       
   327 
       
   328         void waitUntilEqual(T value, long timeout, TimeUnit units)
       
   329                 throws InterruptedException {
       
   330             long millis = units.toMillis(timeout);
       
   331             long stop = System.currentTimeMillis() + millis;
       
   332             Date stopDate = new Date(stop);
       
   333             lock.lock();
       
   334             try {
       
   335                 while (!value.equals(held)) {
       
   336                     boolean ok = changed.awaitUntil(stopDate);
       
   337                     if (!ok)
       
   338                         throw new InterruptedException("Timed out");
       
   339                 }
       
   340             } finally {
       
   341                 lock.unlock();
       
   342             }
       
   343         }
       
   344 
       
   345         void set(T value) {
       
   346             lock.lock();
       
   347             try {
       
   348                 held = value;
       
   349                 changed.signalAll();
       
   350             } finally {
       
   351                 lock.unlock();
       
   352             }
       
   353         }
       
   354 
       
   355         T get() {
       
   356             lock.lock();
       
   357             try {
       
   358                 return held;
       
   359             } finally {
       
   360                 lock.unlock();
       
   361             }
       
   362         }
       
   363     }
       
   364 
       
   365     private static class StoreListener implements NotificationListener {
       
   366         final BlockingQueue<Notification> queue =
       
   367                 new ArrayBlockingQueue<Notification>(100);
       
   368 
       
   369         public void handleNotification(Notification n, Object h) {
       
   370             queue.add(n);
       
   371         }
       
   372 
       
   373         Notification nextNotification(long time, TimeUnit units)
       
   374                 throws InterruptedException {
       
   375             Notification n = queue.poll(time, units);
       
   376             if (n == null)
       
   377                 throw new NoSuchElementException("Notification wait timed out");
       
   378             return n;
       
   379         }
       
   380 
       
   381         int notifCount() {
       
   382             return queue.size();
       
   383         }
       
   384     }
       
   385 
       
   386     public static interface SenderMBean {}
       
   387     public static class Sender
       
   388             extends NotificationBroadcasterSupport implements SenderMBean {
       
   389         private AtomicLong seqNo = new AtomicLong(0);
       
   390 
       
   391         void send() {
       
   392             Notification n =
       
   393                     new Notification("type", this, seqNo.getAndIncrement());
       
   394             sendNotification(n);
       
   395         }
       
   396     }
       
   397 
       
   398     public static void main(String[] args) throws Exception {
       
   399         MBeanServer mbs = MBeanServerFactory.newMBeanServer();
       
   400         Sender sender = new Sender();
       
   401         ObjectName name = new ObjectName("a:b=c");
       
   402         mbs.registerMBean(sender, name);
       
   403 
       
   404         System.out.println("Creating connector server");
       
   405         JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///");
       
   406         JMXConnectorServer cs = JMXConnectorServerFactory.newJMXConnectorServer(
       
   407                 url, null, mbs);
       
   408         cs.start();
       
   409 
       
   410         StoreListener csListener = new StoreListener();
       
   411         cs.addNotificationListener(csListener, null, null);
       
   412 
       
   413         System.out.println("Creating reconnectable client");
       
   414         JMXServiceURL addr = cs.getAddress();
       
   415         ReconnectableJMXConnector cc = new ReconnectableJMXConnector(addr);
       
   416         MBeanServerConnection mbsc = cc.getMBeanServerConnection();
       
   417 
       
   418         System.out.println("Checking server has sent new-client notif");
       
   419         Notification csn = csListener.nextNotification(1, TimeUnit.SECONDS);
       
   420         assertEquals("CS notif type",
       
   421                 JMXConnectionNotification.OPENED, csn.getType());
       
   422 
       
   423         StoreListener listener = new StoreListener();
       
   424         mbsc.addNotificationListener(name, listener, null, null);
       
   425 
       
   426         System.out.println("Sending 10 notifs and checking they are received");
       
   427         for (int i = 0; i < 10; i++)
       
   428             sender.send();
       
   429         checkNotifs(listener, 0, 10);
       
   430 
       
   431         System.out.println("Suspending the fetchNotifs operation");
       
   432         cc.suspend();
       
   433         System.out.println("Sending a notif while fetchNotifs is suspended");
       
   434         sender.send();
       
   435         System.out.println("Brief wait before checking no notif is received");
       
   436         Thread.sleep(2);
       
   437         // dumpThreads();
       
   438         assertEquals("notif queue while connector suspended",
       
   439                 0, listener.notifCount());
       
   440         assertEquals("connector server notif queue while connector suspended",
       
   441                 0, csListener.notifCount());
       
   442 
       
   443         System.out.println("Breaking the connection so fetchNotifs will fail over");
       
   444         cc.kill();
       
   445 
       
   446         System.out.println("Checking that client has reconnected");
       
   447         csn = csListener.nextNotification(1, TimeUnit.SECONDS);
       
   448         assertEquals("First CS notif type after kill",
       
   449                 JMXConnectionNotification.CLOSED, csn.getType());
       
   450         csn = csListener.nextNotification(1, TimeUnit.SECONDS);
       
   451         assertEquals("Second CS notif type after kill",
       
   452                 JMXConnectionNotification.OPENED, csn.getType());
       
   453 
       
   454         System.out.println("Checking that suspended notif has been received");
       
   455         checkNotifs(listener, 10, 11);
       
   456     }
       
   457 
       
   458     private static void checkNotifs(
       
   459              StoreListener sl, long start, long stop)
       
   460             throws Exception {
       
   461         for (long i = start; i < stop; i++) {
       
   462             Notification n = sl.nextNotification(1, TimeUnit.SECONDS);
       
   463             assertEquals("received sequence number", i, n.getSequenceNumber());
       
   464         }
       
   465     }
       
   466 
       
   467     private static void assertEquals(String what, Object expect, Object actual)
       
   468     throws Exception {
       
   469         if (!expect.equals(actual)) {
       
   470             fail(what + " should be " + expect + " but is " + actual);
       
   471         }
       
   472     }
       
   473 
       
   474     private static void fail(String why) throws Exception {
       
   475         throw new Exception("TEST FAILED: " + why);
       
   476     }
       
   477 
       
   478     private static void dumpThreads() {
       
   479         System.out.println("Thread stack dump");
       
   480         Map<Thread, StackTraceElement[]> traces = Thread.getAllStackTraces();
       
   481         for (Map.Entry<Thread, StackTraceElement[]> entry : traces.entrySet()) {
       
   482             Thread t = entry.getKey();
       
   483             System.out.println("===Thread " + t.getName() + "===");
       
   484             for (StackTraceElement ste : entry.getValue())
       
   485                 System.out.println("    " + ste);
       
   486         }
       
   487     }
       
   488 }