jdk/test/javax/management/eventService/ReconnectableConnectorTest.java
author sjiang
Thu, 31 Jul 2008 15:31:13 +0200
changeset 1004 5ba8217eb504
child 1247 b4c26443dee5
permissions -rw-r--r--
5108776: Add reliable event handling to the JMX API 6218920: API bug - impossible to delete last MBeanServerForwarder on a connector Reviewed-by: emcmanus

/*
 * Copyright 2007 Sun Microsystems, Inc.  All Rights Reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
 * CA 95054 USA or visit www.sun.com if you need additional information or
 * have any questions.
 */

/*
 * @test ReconnectableJMXConnector
 * @bug 5108776
 * @summary Check that the Event Service can be used to build a
 * ReconnectableJMXConnector.
 * @author Eamonn McManus
 */

import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Date;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.management.ListenerNotFoundException;
import javax.management.MBeanServer;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerFactory;
import javax.management.Notification;
import javax.management.NotificationBroadcasterSupport;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
import javax.management.ObjectName;
import javax.management.event.EventClient;
import javax.management.remote.JMXConnectionNotification;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXConnectorServer;
import javax.management.remote.JMXConnectorServerFactory;
import javax.management.remote.JMXServiceURL;
import javax.security.auth.Subject;

/*
 * This test checks that it is possible to use the Event Service to create
 * a "reconnectable connector".
 *
 * In the JMX Remote API, we deliberately specified that a connector client
 * (JMXConnector) that encounters a network failure is then permanently broken.
 * The idea being that adding recovery logic to the basic connector client
 * would make it much more complicated and less reliable, and the logic would
 * in any case never correspond to what a given situation needs. Some of
 * the tough questions are: Should the connector try to mask the failure by
 * blocking operations until the failure is resolved? How long should the
 * connector try to reestablish the connection before giving up? Rather than
 * try to solve this problem in the connector, we suggested that people who
 * wanted to recover from network failures could implement the JMXConnector
 * interface themselves so that it forwards to a wrapped JMXConnector that can
 * be replaced in case of network failure.
 *
 * This works fine except that the connector client has state,
 * in the form of listeners added by the user through the
 * MBeanServerConnection.addNotificationListener method. It's possible
 * for the wrapper to keep track of these listeners as well as forwarding
 * them to the wrapped JMXConnector, so that it can reapply them to
 * a replacement JMXConnector after failure recover. But it's quite
 * tricky, particularly because of the two- and four-argument versions of
 * removeNotificationListener.
 *
 * The Event Service can take care of this for you through the EventClient
 * class. Listeners added through that class are implemented in a way that
 * doesn't require the connector client to maintain any state, so they should
 * continue to work transparently after replacing the wrapped JMXConnector.
 * This test is a proof of concept that shows it works.  Quite a number of
 * details would need to be changed to build a reliable reconnectable
 * connector.
 *
 * The test simulates network failure by rewrapping the wrapped JMXConnector's
 * MBeanServerConnection (MBSC) in a "breakable" MBSC which we can cause
 * to stop working.  We do this in two phases.  The first phase suspends
 * any MBSC calls just at the point where they would return to the caller.
 * The goal here is to block an EventClientDelegateMBean.fetchNotifications
 * operation when it has received notifications but not yet delivered them
 * to the EventClient.  This is the most delicate point where a breakage
 * can occur, because the EventClientDelegate must not drop those notifs
 * from its buffer until another fetchNotifs call arrives with a later
 * sequence number (which is an implicit ack of the previous set of
 * notifs).  Once the fetchNotifs call is suspended, we "kill" the MBSC,
 * causing it to throw IOException from this and any other calls.  That
 * triggers the reconnect logic, which will make a new MBSC and issue
 * the same fetchNotifs call to it.
 *
 * The test could be improved by synchronizing explicitly between the
 * breakable MBSC and the mainline, so we only proceed to kill the MBSC
 * when we are sure that the fetchNotifs call is blocked.  As it is,
 * we have a small delay which both ensures that no notifs are delivered
 * while the connection is suspended, and if the machine is fast enough
 * allows the fetchNotifs call to reach the blocking point.
 */
public class ReconnectableConnectorTest {
    private static class ReconnectableJMXConnector implements JMXConnector {
        private final JMXServiceURL url;
        private AtomicReference<JMXConnector> wrappedJMXC =
                new AtomicReference<JMXConnector>();
        private AtomicReference<MBeanServerConnection> wrappedMBSC =
                new AtomicReference<MBeanServerConnection>();
        private final NotificationBroadcasterSupport broadcaster =
                new NotificationBroadcasterSupport();
        private final Lock connectLock = new ReentrantLock();

        ReconnectableJMXConnector(JMXServiceURL url) {
            this.url = url;
        }

        private class ReconnectIH implements InvocationHandler {
            public Object invoke(Object proxy, Method method, Object[] args)
                    throws Throwable {
                try {
                    return method.invoke(wrappedMBSC.get(), args);
                } catch (InvocationTargetException e) {
                    if (e.getCause() instanceof IOException) {
                        connect();
                        try {
                            return method.invoke(wrappedMBSC.get(),args);
                        } catch (InvocationTargetException ee) {
                            throw ee.getCause();
                        }
                    }
                    throw e.getCause();
                }
            }
        }

        private class FailureListener implements NotificationListener {
            public void handleNotification(Notification n, Object h) {
                String type = n.getType();
                if (type.equals(JMXConnectionNotification.FAILED)) {
                    try {
                        connect();
                    } catch (IOException e) {
                        broadcaster.sendNotification(n);
                    }
                } else if (type.equals(JMXConnectionNotification.NOTIFS_LOST))
                    broadcaster.sendNotification(n);
            }
        }

        public void connect() throws IOException {
            connectLock.lock();
            try {
                connectWithLock();
            } finally {
                connectLock.unlock();
            }
        }

        private void connectWithLock() throws IOException {
            MBeanServerConnection mbsc = wrappedMBSC.get();
            if (mbsc != null) {
                try {
                    mbsc.getDefaultDomain();
                    return;  // the connection works
                } catch (IOException e) {
                    // OK: the connection doesn't work, so make a new one
                }
            }
            // This is where we would need to add the fancy logic that
            // allows the connection to keep failing for a while
            // before giving up.
            JMXConnector jmxc = JMXConnectorFactory.connect(url);
            jmxc.addConnectionNotificationListener(
                    new FailureListener(), null, null);
            wrappedJMXC.set(jmxc);
            if (false)
                wrappedMBSC.set(jmxc.getMBeanServerConnection());
            else {
                mbsc = jmxc.getMBeanServerConnection();
                InvocationHandler ih = new BreakableIH(mbsc);
                mbsc = (MBeanServerConnection) Proxy.newProxyInstance(
                        MBeanServerConnection.class.getClassLoader(),
                        new Class<?>[] {MBeanServerConnection.class},
                        ih);
                wrappedMBSC.set(mbsc);
            }
        }

        private BreakableIH breakableIH() {
            MBeanServerConnection mbsc = wrappedMBSC.get();
            return (BreakableIH) Proxy.getInvocationHandler(mbsc);
        }

        void suspend() {
            BreakableIH ih = breakableIH();
            ih.suspend();
        }

        void kill() throws IOException {
            BreakableIH ih = breakableIH();
            wrappedJMXC.get().close();
            ih.kill();
        }

        public void connect(Map<String, ?> env) throws IOException {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        private final AtomicReference<MBeanServerConnection> mbscRef =
                new AtomicReference<MBeanServerConnection>();

        public MBeanServerConnection getMBeanServerConnection()
                throws IOException {
            connect();
            // Synchro here is not strictly correct: two threads could make
            // an MBSC at the same time.  OK for a test but beware for real
            // code.
            MBeanServerConnection mbsc = mbscRef.get();
            if (mbsc != null)
                return mbsc;
            mbsc = (MBeanServerConnection) Proxy.newProxyInstance(
                    MBeanServerConnection.class.getClassLoader(),
                    new Class<?>[] {MBeanServerConnection.class},
                    new ReconnectIH());
            mbsc = EventClient.getEventClientConnection(mbsc);
            mbscRef.set(mbsc);
            return mbsc;
        }

        public MBeanServerConnection getMBeanServerConnection(
                Subject delegationSubject) throws IOException {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        public void close() throws IOException {
            wrappedJMXC.get().close();
        }

        public void addConnectionNotificationListener(
                NotificationListener l, NotificationFilter f, Object h) {
            broadcaster.addNotificationListener(l, f, h);
        }

        public void removeConnectionNotificationListener(NotificationListener l)
                throws ListenerNotFoundException {
            broadcaster.removeNotificationListener(l);
        }

        public void removeConnectionNotificationListener(
                NotificationListener l, NotificationFilter f, Object h)
                throws ListenerNotFoundException {
            broadcaster.removeNotificationListener(l, f, h);
        }

        public String getConnectionId() throws IOException {
            return wrappedJMXC.get().getConnectionId();
        }
    }

    // InvocationHandler that allows us to perform a two-phase "break" of
    // an object.  The first phase suspends the object, so that calls to
    // it are blocked just before they return.  The second phase unblocks
    // suspended threads and causes them to throw IOException.
    private static class BreakableIH implements InvocationHandler {
        private final Object wrapped;
        private final Holder<String> state = new Holder<String>("running");

        BreakableIH(Object wrapped) {
            this.wrapped = wrapped;
        }

        void suspend() {
            state.set("suspended");
        }

        void kill() {
            state.set("killed");
        }

        public Object invoke(Object proxy, Method method, Object[] args)
                throws Throwable {
            Object result;
            try {
                result = method.invoke(wrapped, args);
            } catch (InvocationTargetException e) {
                throw e.getCause();
            }
            String s = state.get();
            if (s.equals("suspended"))
                state.waitUntilEqual("killed", 3, TimeUnit.SECONDS);
            else if (s.equals("killed"))
                throw new IOException("Broken");
            return result;
        }
    }

    private static class Holder<T> {
        private T held;
        private Lock lock = new ReentrantLock();
        private Condition changed = lock.newCondition();

        Holder(T value) {
            lock.lock();
            this.held = value;
            lock.unlock();
        }

        void waitUntilEqual(T value, long timeout, TimeUnit units)
                throws InterruptedException {
            long millis = units.toMillis(timeout);
            long stop = System.currentTimeMillis() + millis;
            Date stopDate = new Date(stop);
            lock.lock();
            try {
                while (!value.equals(held)) {
                    boolean ok = changed.awaitUntil(stopDate);
                    if (!ok)
                        throw new InterruptedException("Timed out");
                }
            } finally {
                lock.unlock();
            }
        }

        void set(T value) {
            lock.lock();
            try {
                held = value;
                changed.signalAll();
            } finally {
                lock.unlock();
            }
        }

        T get() {
            lock.lock();
            try {
                return held;
            } finally {
                lock.unlock();
            }
        }
    }

    private static class StoreListener implements NotificationListener {
        final BlockingQueue<Notification> queue =
                new ArrayBlockingQueue<Notification>(100);

        public void handleNotification(Notification n, Object h) {
            queue.add(n);
        }

        Notification nextNotification(long time, TimeUnit units)
                throws InterruptedException {
            Notification n = queue.poll(time, units);
            if (n == null)
                throw new NoSuchElementException("Notification wait timed out");
            return n;
        }

        int notifCount() {
            return queue.size();
        }
    }

    public static interface SenderMBean {}
    public static class Sender
            extends NotificationBroadcasterSupport implements SenderMBean {
        private AtomicLong seqNo = new AtomicLong(0);

        void send() {
            Notification n =
                    new Notification("type", this, seqNo.getAndIncrement());
            sendNotification(n);
        }
    }

    public static void main(String[] args) throws Exception {
        MBeanServer mbs = MBeanServerFactory.newMBeanServer();
        Sender sender = new Sender();
        ObjectName name = new ObjectName("a:b=c");
        mbs.registerMBean(sender, name);

        System.out.println("Creating connector server");
        JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///");
        JMXConnectorServer cs = JMXConnectorServerFactory.newJMXConnectorServer(
                url, null, mbs);
        cs.start();

        StoreListener csListener = new StoreListener();
        cs.addNotificationListener(csListener, null, null);

        System.out.println("Creating reconnectable client");
        JMXServiceURL addr = cs.getAddress();
        ReconnectableJMXConnector cc = new ReconnectableJMXConnector(addr);
        MBeanServerConnection mbsc = cc.getMBeanServerConnection();

        System.out.println("Checking server has sent new-client notif");
        Notification csn = csListener.nextNotification(1, TimeUnit.SECONDS);
        assertEquals("CS notif type",
                JMXConnectionNotification.OPENED, csn.getType());

        StoreListener listener = new StoreListener();
        mbsc.addNotificationListener(name, listener, null, null);

        System.out.println("Sending 10 notifs and checking they are received");
        for (int i = 0; i < 10; i++)
            sender.send();
        checkNotifs(listener, 0, 10);

        System.out.println("Suspending the fetchNotifs operation");
        cc.suspend();
        System.out.println("Sending a notif while fetchNotifs is suspended");
        sender.send();
        System.out.println("Brief wait before checking no notif is received");
        Thread.sleep(2);
        // dumpThreads();
        assertEquals("notif queue while connector suspended",
                0, listener.notifCount());
        assertEquals("connector server notif queue while connector suspended",
                0, csListener.notifCount());

        System.out.println("Breaking the connection so fetchNotifs will fail over");
        cc.kill();

        System.out.println("Checking that client has reconnected");
        csn = csListener.nextNotification(1, TimeUnit.SECONDS);
        assertEquals("First CS notif type after kill",
                JMXConnectionNotification.CLOSED, csn.getType());
        csn = csListener.nextNotification(1, TimeUnit.SECONDS);
        assertEquals("Second CS notif type after kill",
                JMXConnectionNotification.OPENED, csn.getType());

        System.out.println("Checking that suspended notif has been received");
        checkNotifs(listener, 10, 11);
    }

    private static void checkNotifs(
             StoreListener sl, long start, long stop)
            throws Exception {
        for (long i = start; i < stop; i++) {
            Notification n = sl.nextNotification(1, TimeUnit.SECONDS);
            assertEquals("received sequence number", i, n.getSequenceNumber());
        }
    }

    private static void assertEquals(String what, Object expect, Object actual)
    throws Exception {
        if (!expect.equals(actual)) {
            fail(what + " should be " + expect + " but is " + actual);
        }
    }

    private static void fail(String why) throws Exception {
        throw new Exception("TEST FAILED: " + why);
    }

    private static void dumpThreads() {
        System.out.println("Thread stack dump");
        Map<Thread, StackTraceElement[]> traces = Thread.getAllStackTraces();
        for (Map.Entry<Thread, StackTraceElement[]> entry : traces.entrySet()) {
            Thread t = entry.getKey();
            System.out.println("===Thread " + t.getName() + "===");
            for (StackTraceElement ste : entry.getValue())
                System.out.println("    " + ste);
        }
    }
}