jdk/test/javax/management/eventService/ReconnectableConnectorTest.java
changeset 4156 acaa49a2768a
parent 4155 460e37d40f12
child 4159 9e3aae7675f1
--- a/jdk/test/javax/management/eventService/ReconnectableConnectorTest.java	Wed Oct 21 16:28:57 2009 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,488 +0,0 @@
-/*
- * Copyright 2007-2008 Sun Microsystems, Inc.  All Rights Reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- */
-
-/*
- * @test ReconnectableJMXConnector
- * @bug 5108776
- * @summary Check that the Event Service can be used to build a
- * ReconnectableJMXConnector.
- * @author Eamonn McManus
- */
-
-import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.Date;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.management.ListenerNotFoundException;
-import javax.management.MBeanServer;
-import javax.management.MBeanServerConnection;
-import javax.management.MBeanServerFactory;
-import javax.management.Notification;
-import javax.management.NotificationBroadcasterSupport;
-import javax.management.NotificationFilter;
-import javax.management.NotificationListener;
-import javax.management.ObjectName;
-import javax.management.event.EventClient;
-import javax.management.remote.JMXConnectionNotification;
-import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXConnectorServer;
-import javax.management.remote.JMXConnectorServerFactory;
-import javax.management.remote.JMXServiceURL;
-import javax.security.auth.Subject;
-
-/*
- * This test checks that it is possible to use the Event Service to create
- * a "reconnectable connector".
- *
- * In the JMX Remote API, we deliberately specified that a connector client
- * (JMXConnector) that encounters a network failure is then permanently broken.
- * The idea being that adding recovery logic to the basic connector client
- * would make it much more complicated and less reliable, and the logic would
- * in any case never correspond to what a given situation needs. Some of
- * the tough questions are: Should the connector try to mask the failure by
- * blocking operations until the failure is resolved? How long should the
- * connector try to reestablish the connection before giving up? Rather than
- * try to solve this problem in the connector, we suggested that people who
- * wanted to recover from network failures could implement the JMXConnector
- * interface themselves so that it forwards to a wrapped JMXConnector that can
- * be replaced in case of network failure.
- *
- * This works fine except that the connector client has state,
- * in the form of listeners added by the user through the
- * MBeanServerConnection.addNotificationListener method. It's possible
- * for the wrapper to keep track of these listeners as well as forwarding
- * them to the wrapped JMXConnector, so that it can reapply them to
- * a replacement JMXConnector after failure recover. But it's quite
- * tricky, particularly because of the two- and four-argument versions of
- * removeNotificationListener.
- *
- * The Event Service can take care of this for you through the EventClient
- * class. Listeners added through that class are implemented in a way that
- * doesn't require the connector client to maintain any state, so they should
- * continue to work transparently after replacing the wrapped JMXConnector.
- * This test is a proof of concept that shows it works.  Quite a number of
- * details would need to be changed to build a reliable reconnectable
- * connector.
- *
- * The test simulates network failure by rewrapping the wrapped JMXConnector's
- * MBeanServerConnection (MBSC) in a "breakable" MBSC which we can cause
- * to stop working.  We do this in two phases.  The first phase suspends
- * any MBSC calls just at the point where they would return to the caller.
- * The goal here is to block an EventClientDelegateMBean.fetchNotifications
- * operation when it has received notifications but not yet delivered them
- * to the EventClient.  This is the most delicate point where a breakage
- * can occur, because the EventClientDelegate must not drop those notifs
- * from its buffer until another fetchNotifs call arrives with a later
- * sequence number (which is an implicit ack of the previous set of
- * notifs).  Once the fetchNotifs call is suspended, we "kill" the MBSC,
- * causing it to throw IOException from this and any other calls.  That
- * triggers the reconnect logic, which will make a new MBSC and issue
- * the same fetchNotifs call to it.
- *
- * The test could be improved by synchronizing explicitly between the
- * breakable MBSC and the mainline, so we only proceed to kill the MBSC
- * when we are sure that the fetchNotifs call is blocked.  As it is,
- * we have a small delay which both ensures that no notifs are delivered
- * while the connection is suspended, and if the machine is fast enough
- * allows the fetchNotifs call to reach the blocking point.
- */
-public class ReconnectableConnectorTest {
-    private static class ReconnectableJMXConnector implements JMXConnector {
-        private final JMXServiceURL url;
-        private AtomicReference<JMXConnector> wrappedJMXC =
-                new AtomicReference<JMXConnector>();
-        private AtomicReference<MBeanServerConnection> wrappedMBSC =
-                new AtomicReference<MBeanServerConnection>();
-        private final NotificationBroadcasterSupport broadcaster =
-                new NotificationBroadcasterSupport();
-        private final Lock connectLock = new ReentrantLock();
-
-        ReconnectableJMXConnector(JMXServiceURL url) {
-            this.url = url;
-        }
-
-        private class ReconnectIH implements InvocationHandler {
-            public Object invoke(Object proxy, Method method, Object[] args)
-                    throws Throwable {
-                try {
-                    return method.invoke(wrappedMBSC.get(), args);
-                } catch (InvocationTargetException e) {
-                    if (e.getCause() instanceof IOException) {
-                        connect();
-                        try {
-                            return method.invoke(wrappedMBSC.get(),args);
-                        } catch (InvocationTargetException ee) {
-                            throw ee.getCause();
-                        }
-                    }
-                    throw e.getCause();
-                }
-            }
-        }
-
-        private class FailureListener implements NotificationListener {
-            public void handleNotification(Notification n, Object h) {
-                String type = n.getType();
-                if (type.equals(JMXConnectionNotification.FAILED)) {
-                    try {
-                        connect();
-                    } catch (IOException e) {
-                        broadcaster.sendNotification(n);
-                    }
-                } else if (type.equals(JMXConnectionNotification.NOTIFS_LOST))
-                    broadcaster.sendNotification(n);
-            }
-        }
-
-        public void connect() throws IOException {
-            connectLock.lock();
-            try {
-                connectWithLock();
-            } finally {
-                connectLock.unlock();
-            }
-        }
-
-        private void connectWithLock() throws IOException {
-            MBeanServerConnection mbsc = wrappedMBSC.get();
-            if (mbsc != null) {
-                try {
-                    mbsc.getDefaultDomain();
-                    return;  // the connection works
-                } catch (IOException e) {
-                    // OK: the connection doesn't work, so make a new one
-                }
-            }
-            // This is where we would need to add the fancy logic that
-            // allows the connection to keep failing for a while
-            // before giving up.
-            JMXConnector jmxc = JMXConnectorFactory.connect(url);
-            jmxc.addConnectionNotificationListener(
-                    new FailureListener(), null, null);
-            wrappedJMXC.set(jmxc);
-            if (false)
-                wrappedMBSC.set(jmxc.getMBeanServerConnection());
-            else {
-                mbsc = jmxc.getMBeanServerConnection();
-                InvocationHandler ih = new BreakableIH(mbsc);
-                mbsc = (MBeanServerConnection) Proxy.newProxyInstance(
-                        MBeanServerConnection.class.getClassLoader(),
-                        new Class<?>[] {MBeanServerConnection.class},
-                        ih);
-                wrappedMBSC.set(mbsc);
-            }
-        }
-
-        private BreakableIH breakableIH() {
-            MBeanServerConnection mbsc = wrappedMBSC.get();
-            return (BreakableIH) Proxy.getInvocationHandler(mbsc);
-        }
-
-        void suspend() {
-            BreakableIH ih = breakableIH();
-            ih.suspend();
-        }
-
-        void kill() throws IOException {
-            BreakableIH ih = breakableIH();
-            wrappedJMXC.get().close();
-            ih.kill();
-        }
-
-        public void connect(Map<String, ?> env) throws IOException {
-            throw new UnsupportedOperationException("Not supported yet.");
-        }
-
-        private final AtomicReference<MBeanServerConnection> mbscRef =
-                new AtomicReference<MBeanServerConnection>();
-
-        public MBeanServerConnection getMBeanServerConnection()
-                throws IOException {
-            connect();
-            // Synchro here is not strictly correct: two threads could make
-            // an MBSC at the same time.  OK for a test but beware for real
-            // code.
-            MBeanServerConnection mbsc = mbscRef.get();
-            if (mbsc != null)
-                return mbsc;
-            mbsc = (MBeanServerConnection) Proxy.newProxyInstance(
-                    MBeanServerConnection.class.getClassLoader(),
-                    new Class<?>[] {MBeanServerConnection.class},
-                    new ReconnectIH());
-            mbsc = EventClient.getEventClientConnection(mbsc);
-            mbscRef.set(mbsc);
-            return mbsc;
-        }
-
-        public MBeanServerConnection getMBeanServerConnection(
-                Subject delegationSubject) throws IOException {
-            throw new UnsupportedOperationException("Not supported yet.");
-        }
-
-        public void close() throws IOException {
-            wrappedJMXC.get().close();
-        }
-
-        public void addConnectionNotificationListener(
-                NotificationListener l, NotificationFilter f, Object h) {
-            broadcaster.addNotificationListener(l, f, h);
-        }
-
-        public void removeConnectionNotificationListener(NotificationListener l)
-                throws ListenerNotFoundException {
-            broadcaster.removeNotificationListener(l);
-        }
-
-        public void removeConnectionNotificationListener(
-                NotificationListener l, NotificationFilter f, Object h)
-                throws ListenerNotFoundException {
-            broadcaster.removeNotificationListener(l, f, h);
-        }
-
-        public String getConnectionId() throws IOException {
-            return wrappedJMXC.get().getConnectionId();
-        }
-    }
-
-    // InvocationHandler that allows us to perform a two-phase "break" of
-    // an object.  The first phase suspends the object, so that calls to
-    // it are blocked just before they return.  The second phase unblocks
-    // suspended threads and causes them to throw IOException.
-    private static class BreakableIH implements InvocationHandler {
-        private final Object wrapped;
-        private final Holder<String> state = new Holder<String>("running");
-
-        BreakableIH(Object wrapped) {
-            this.wrapped = wrapped;
-        }
-
-        void suspend() {
-            state.set("suspended");
-        }
-
-        void kill() {
-            state.set("killed");
-        }
-
-        public Object invoke(Object proxy, Method method, Object[] args)
-                throws Throwable {
-            Object result;
-            try {
-                result = method.invoke(wrapped, args);
-            } catch (InvocationTargetException e) {
-                throw e.getCause();
-            }
-            String s = state.get();
-            if (s.equals("suspended"))
-                state.waitUntilEqual("killed", 3, TimeUnit.SECONDS);
-            else if (s.equals("killed"))
-                throw new IOException("Broken");
-            return result;
-        }
-    }
-
-    private static class Holder<T> {
-        private T held;
-        private Lock lock = new ReentrantLock();
-        private Condition changed = lock.newCondition();
-
-        Holder(T value) {
-            lock.lock();
-            this.held = value;
-            lock.unlock();
-        }
-
-        void waitUntilEqual(T value, long timeout, TimeUnit units)
-                throws InterruptedException {
-            long millis = units.toMillis(timeout);
-            long stop = System.currentTimeMillis() + millis;
-            Date stopDate = new Date(stop);
-            lock.lock();
-            try {
-                while (!value.equals(held)) {
-                    boolean ok = changed.awaitUntil(stopDate);
-                    if (!ok)
-                        throw new InterruptedException("Timed out");
-                }
-            } finally {
-                lock.unlock();
-            }
-        }
-
-        void set(T value) {
-            lock.lock();
-            try {
-                held = value;
-                changed.signalAll();
-            } finally {
-                lock.unlock();
-            }
-        }
-
-        T get() {
-            lock.lock();
-            try {
-                return held;
-            } finally {
-                lock.unlock();
-            }
-        }
-    }
-
-    private static class StoreListener implements NotificationListener {
-        final BlockingQueue<Notification> queue =
-                new ArrayBlockingQueue<Notification>(100);
-
-        public void handleNotification(Notification n, Object h) {
-            queue.add(n);
-        }
-
-        Notification nextNotification(long time, TimeUnit units)
-                throws InterruptedException {
-            Notification n = queue.poll(time, units);
-            if (n == null)
-                throw new NoSuchElementException("Notification wait timed out");
-            return n;
-        }
-
-        int notifCount() {
-            return queue.size();
-        }
-    }
-
-    public static interface SenderMBean {}
-    public static class Sender
-            extends NotificationBroadcasterSupport implements SenderMBean {
-        private AtomicLong seqNo = new AtomicLong(0);
-
-        void send() {
-            Notification n =
-                    new Notification("type", this, seqNo.getAndIncrement());
-            sendNotification(n);
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        MBeanServer mbs = MBeanServerFactory.newMBeanServer();
-        Sender sender = new Sender();
-        ObjectName name = new ObjectName("a:b=c");
-        mbs.registerMBean(sender, name);
-
-        System.out.println("Creating connector server");
-        JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///");
-        JMXConnectorServer cs = JMXConnectorServerFactory.newJMXConnectorServer(
-                url, null, mbs);
-        cs.start();
-
-        StoreListener csListener = new StoreListener();
-        cs.addNotificationListener(csListener, null, null);
-
-        System.out.println("Creating reconnectable client");
-        JMXServiceURL addr = cs.getAddress();
-        ReconnectableJMXConnector cc = new ReconnectableJMXConnector(addr);
-        MBeanServerConnection mbsc = cc.getMBeanServerConnection();
-
-        System.out.println("Checking server has sent new-client notif");
-        Notification csn = csListener.nextNotification(1, TimeUnit.SECONDS);
-        assertEquals("CS notif type",
-                JMXConnectionNotification.OPENED, csn.getType());
-
-        StoreListener listener = new StoreListener();
-        mbsc.addNotificationListener(name, listener, null, null);
-
-        System.out.println("Sending 10 notifs and checking they are received");
-        for (int i = 0; i < 10; i++)
-            sender.send();
-        checkNotifs(listener, 0, 10);
-
-        System.out.println("Suspending the fetchNotifs operation");
-        cc.suspend();
-        System.out.println("Sending a notif while fetchNotifs is suspended");
-        sender.send();
-        System.out.println("Brief wait before checking no notif is received");
-        Thread.sleep(2);
-        // dumpThreads();
-        assertEquals("notif queue while connector suspended",
-                0, listener.notifCount());
-        assertEquals("connector server notif queue while connector suspended",
-                0, csListener.notifCount());
-
-        System.out.println("Breaking the connection so fetchNotifs will fail over");
-        cc.kill();
-
-        System.out.println("Checking that client has reconnected");
-        csn = csListener.nextNotification(1, TimeUnit.SECONDS);
-        assertEquals("First CS notif type after kill",
-                JMXConnectionNotification.CLOSED, csn.getType());
-        csn = csListener.nextNotification(1, TimeUnit.SECONDS);
-        assertEquals("Second CS notif type after kill",
-                JMXConnectionNotification.OPENED, csn.getType());
-
-        System.out.println("Checking that suspended notif has been received");
-        checkNotifs(listener, 10, 11);
-    }
-
-    private static void checkNotifs(
-             StoreListener sl, long start, long stop)
-            throws Exception {
-        for (long i = start; i < stop; i++) {
-            Notification n = sl.nextNotification(1, TimeUnit.SECONDS);
-            assertEquals("received sequence number", i, n.getSequenceNumber());
-        }
-    }
-
-    private static void assertEquals(String what, Object expect, Object actual)
-    throws Exception {
-        if (!expect.equals(actual)) {
-            fail(what + " should be " + expect + " but is " + actual);
-        }
-    }
-
-    private static void fail(String why) throws Exception {
-        throw new Exception("TEST FAILED: " + why);
-    }
-
-    private static void dumpThreads() {
-        System.out.println("Thread stack dump");
-        Map<Thread, StackTraceElement[]> traces = Thread.getAllStackTraces();
-        for (Map.Entry<Thread, StackTraceElement[]> entry : traces.entrySet()) {
-            Thread t = entry.getKey();
-            System.out.println("===Thread " + t.getName() + "===");
-            for (StackTraceElement ste : entry.getValue())
-                System.out.println("    " + ste);
-        }
-    }
-}