5108776: Add reliable event handling to the JMX API
6218920: API bug - impossible to delete last MBeanServerForwarder on a connector
Reviewed-by: emcmanus
/*
* Copyright 2007 Sun Microsystems, Inc. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
* CA 95054 USA or visit www.sun.com if you need additional information or
* have any questions.
*/
/*
* @test ReconnectableJMXConnector
* @bug 5108776
* @summary Check that the Event Service can be used to build a
* ReconnectableJMXConnector.
* @author Eamonn McManus
*/
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Date;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.management.ListenerNotFoundException;
import javax.management.MBeanServer;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerFactory;
import javax.management.Notification;
import javax.management.NotificationBroadcasterSupport;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
import javax.management.ObjectName;
import javax.management.event.EventClient;
import javax.management.remote.JMXConnectionNotification;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXConnectorServer;
import javax.management.remote.JMXConnectorServerFactory;
import javax.management.remote.JMXServiceURL;
import javax.security.auth.Subject;
/*
* This test checks that it is possible to use the Event Service to create
* a "reconnectable connector".
*
* In the JMX Remote API, we deliberately specified that a connector client
* (JMXConnector) that encounters a network failure is then permanently broken.
* The idea being that adding recovery logic to the basic connector client
* would make it much more complicated and less reliable, and the logic would
* in any case never correspond to what a given situation needs. Some of
* the tough questions are: Should the connector try to mask the failure by
* blocking operations until the failure is resolved? How long should the
* connector try to reestablish the connection before giving up? Rather than
* try to solve this problem in the connector, we suggested that people who
* wanted to recover from network failures could implement the JMXConnector
* interface themselves so that it forwards to a wrapped JMXConnector that can
* be replaced in case of network failure.
*
* This works fine except that the connector client has state,
* in the form of listeners added by the user through the
* MBeanServerConnection.addNotificationListener method. It's possible
* for the wrapper to keep track of these listeners as well as forwarding
* them to the wrapped JMXConnector, so that it can reapply them to
* a replacement JMXConnector after failure recover. But it's quite
* tricky, particularly because of the two- and four-argument versions of
* removeNotificationListener.
*
* The Event Service can take care of this for you through the EventClient
* class. Listeners added through that class are implemented in a way that
* doesn't require the connector client to maintain any state, so they should
* continue to work transparently after replacing the wrapped JMXConnector.
* This test is a proof of concept that shows it works. Quite a number of
* details would need to be changed to build a reliable reconnectable
* connector.
*
* The test simulates network failure by rewrapping the wrapped JMXConnector's
* MBeanServerConnection (MBSC) in a "breakable" MBSC which we can cause
* to stop working. We do this in two phases. The first phase suspends
* any MBSC calls just at the point where they would return to the caller.
* The goal here is to block an EventClientDelegateMBean.fetchNotifications
* operation when it has received notifications but not yet delivered them
* to the EventClient. This is the most delicate point where a breakage
* can occur, because the EventClientDelegate must not drop those notifs
* from its buffer until another fetchNotifs call arrives with a later
* sequence number (which is an implicit ack of the previous set of
* notifs). Once the fetchNotifs call is suspended, we "kill" the MBSC,
* causing it to throw IOException from this and any other calls. That
* triggers the reconnect logic, which will make a new MBSC and issue
* the same fetchNotifs call to it.
*
* The test could be improved by synchronizing explicitly between the
* breakable MBSC and the mainline, so we only proceed to kill the MBSC
* when we are sure that the fetchNotifs call is blocked. As it is,
* we have a small delay which both ensures that no notifs are delivered
* while the connection is suspended, and if the machine is fast enough
* allows the fetchNotifs call to reach the blocking point.
*/
public class ReconnectableConnectorTest {
private static class ReconnectableJMXConnector implements JMXConnector {
private final JMXServiceURL url;
private AtomicReference<JMXConnector> wrappedJMXC =
new AtomicReference<JMXConnector>();
private AtomicReference<MBeanServerConnection> wrappedMBSC =
new AtomicReference<MBeanServerConnection>();
private final NotificationBroadcasterSupport broadcaster =
new NotificationBroadcasterSupport();
private final Lock connectLock = new ReentrantLock();
ReconnectableJMXConnector(JMXServiceURL url) {
this.url = url;
}
private class ReconnectIH implements InvocationHandler {
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
try {
return method.invoke(wrappedMBSC.get(), args);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof IOException) {
connect();
try {
return method.invoke(wrappedMBSC.get(),args);
} catch (InvocationTargetException ee) {
throw ee.getCause();
}
}
throw e.getCause();
}
}
}
private class FailureListener implements NotificationListener {
public void handleNotification(Notification n, Object h) {
String type = n.getType();
if (type.equals(JMXConnectionNotification.FAILED)) {
try {
connect();
} catch (IOException e) {
broadcaster.sendNotification(n);
}
} else if (type.equals(JMXConnectionNotification.NOTIFS_LOST))
broadcaster.sendNotification(n);
}
}
public void connect() throws IOException {
connectLock.lock();
try {
connectWithLock();
} finally {
connectLock.unlock();
}
}
private void connectWithLock() throws IOException {
MBeanServerConnection mbsc = wrappedMBSC.get();
if (mbsc != null) {
try {
mbsc.getDefaultDomain();
return; // the connection works
} catch (IOException e) {
// OK: the connection doesn't work, so make a new one
}
}
// This is where we would need to add the fancy logic that
// allows the connection to keep failing for a while
// before giving up.
JMXConnector jmxc = JMXConnectorFactory.connect(url);
jmxc.addConnectionNotificationListener(
new FailureListener(), null, null);
wrappedJMXC.set(jmxc);
if (false)
wrappedMBSC.set(jmxc.getMBeanServerConnection());
else {
mbsc = jmxc.getMBeanServerConnection();
InvocationHandler ih = new BreakableIH(mbsc);
mbsc = (MBeanServerConnection) Proxy.newProxyInstance(
MBeanServerConnection.class.getClassLoader(),
new Class<?>[] {MBeanServerConnection.class},
ih);
wrappedMBSC.set(mbsc);
}
}
private BreakableIH breakableIH() {
MBeanServerConnection mbsc = wrappedMBSC.get();
return (BreakableIH) Proxy.getInvocationHandler(mbsc);
}
void suspend() {
BreakableIH ih = breakableIH();
ih.suspend();
}
void kill() throws IOException {
BreakableIH ih = breakableIH();
wrappedJMXC.get().close();
ih.kill();
}
public void connect(Map<String, ?> env) throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}
private final AtomicReference<MBeanServerConnection> mbscRef =
new AtomicReference<MBeanServerConnection>();
public MBeanServerConnection getMBeanServerConnection()
throws IOException {
connect();
// Synchro here is not strictly correct: two threads could make
// an MBSC at the same time. OK for a test but beware for real
// code.
MBeanServerConnection mbsc = mbscRef.get();
if (mbsc != null)
return mbsc;
mbsc = (MBeanServerConnection) Proxy.newProxyInstance(
MBeanServerConnection.class.getClassLoader(),
new Class<?>[] {MBeanServerConnection.class},
new ReconnectIH());
mbsc = EventClient.getEventClientConnection(mbsc);
mbscRef.set(mbsc);
return mbsc;
}
public MBeanServerConnection getMBeanServerConnection(
Subject delegationSubject) throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}
public void close() throws IOException {
wrappedJMXC.get().close();
}
public void addConnectionNotificationListener(
NotificationListener l, NotificationFilter f, Object h) {
broadcaster.addNotificationListener(l, f, h);
}
public void removeConnectionNotificationListener(NotificationListener l)
throws ListenerNotFoundException {
broadcaster.removeNotificationListener(l);
}
public void removeConnectionNotificationListener(
NotificationListener l, NotificationFilter f, Object h)
throws ListenerNotFoundException {
broadcaster.removeNotificationListener(l, f, h);
}
public String getConnectionId() throws IOException {
return wrappedJMXC.get().getConnectionId();
}
}
// InvocationHandler that allows us to perform a two-phase "break" of
// an object. The first phase suspends the object, so that calls to
// it are blocked just before they return. The second phase unblocks
// suspended threads and causes them to throw IOException.
private static class BreakableIH implements InvocationHandler {
private final Object wrapped;
private final Holder<String> state = new Holder<String>("running");
BreakableIH(Object wrapped) {
this.wrapped = wrapped;
}
void suspend() {
state.set("suspended");
}
void kill() {
state.set("killed");
}
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
Object result;
try {
result = method.invoke(wrapped, args);
} catch (InvocationTargetException e) {
throw e.getCause();
}
String s = state.get();
if (s.equals("suspended"))
state.waitUntilEqual("killed", 3, TimeUnit.SECONDS);
else if (s.equals("killed"))
throw new IOException("Broken");
return result;
}
}
private static class Holder<T> {
private T held;
private Lock lock = new ReentrantLock();
private Condition changed = lock.newCondition();
Holder(T value) {
lock.lock();
this.held = value;
lock.unlock();
}
void waitUntilEqual(T value, long timeout, TimeUnit units)
throws InterruptedException {
long millis = units.toMillis(timeout);
long stop = System.currentTimeMillis() + millis;
Date stopDate = new Date(stop);
lock.lock();
try {
while (!value.equals(held)) {
boolean ok = changed.awaitUntil(stopDate);
if (!ok)
throw new InterruptedException("Timed out");
}
} finally {
lock.unlock();
}
}
void set(T value) {
lock.lock();
try {
held = value;
changed.signalAll();
} finally {
lock.unlock();
}
}
T get() {
lock.lock();
try {
return held;
} finally {
lock.unlock();
}
}
}
private static class StoreListener implements NotificationListener {
final BlockingQueue<Notification> queue =
new ArrayBlockingQueue<Notification>(100);
public void handleNotification(Notification n, Object h) {
queue.add(n);
}
Notification nextNotification(long time, TimeUnit units)
throws InterruptedException {
Notification n = queue.poll(time, units);
if (n == null)
throw new NoSuchElementException("Notification wait timed out");
return n;
}
int notifCount() {
return queue.size();
}
}
public static interface SenderMBean {}
public static class Sender
extends NotificationBroadcasterSupport implements SenderMBean {
private AtomicLong seqNo = new AtomicLong(0);
void send() {
Notification n =
new Notification("type", this, seqNo.getAndIncrement());
sendNotification(n);
}
}
public static void main(String[] args) throws Exception {
MBeanServer mbs = MBeanServerFactory.newMBeanServer();
Sender sender = new Sender();
ObjectName name = new ObjectName("a:b=c");
mbs.registerMBean(sender, name);
System.out.println("Creating connector server");
JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///");
JMXConnectorServer cs = JMXConnectorServerFactory.newJMXConnectorServer(
url, null, mbs);
cs.start();
StoreListener csListener = new StoreListener();
cs.addNotificationListener(csListener, null, null);
System.out.println("Creating reconnectable client");
JMXServiceURL addr = cs.getAddress();
ReconnectableJMXConnector cc = new ReconnectableJMXConnector(addr);
MBeanServerConnection mbsc = cc.getMBeanServerConnection();
System.out.println("Checking server has sent new-client notif");
Notification csn = csListener.nextNotification(1, TimeUnit.SECONDS);
assertEquals("CS notif type",
JMXConnectionNotification.OPENED, csn.getType());
StoreListener listener = new StoreListener();
mbsc.addNotificationListener(name, listener, null, null);
System.out.println("Sending 10 notifs and checking they are received");
for (int i = 0; i < 10; i++)
sender.send();
checkNotifs(listener, 0, 10);
System.out.println("Suspending the fetchNotifs operation");
cc.suspend();
System.out.println("Sending a notif while fetchNotifs is suspended");
sender.send();
System.out.println("Brief wait before checking no notif is received");
Thread.sleep(2);
// dumpThreads();
assertEquals("notif queue while connector suspended",
0, listener.notifCount());
assertEquals("connector server notif queue while connector suspended",
0, csListener.notifCount());
System.out.println("Breaking the connection so fetchNotifs will fail over");
cc.kill();
System.out.println("Checking that client has reconnected");
csn = csListener.nextNotification(1, TimeUnit.SECONDS);
assertEquals("First CS notif type after kill",
JMXConnectionNotification.CLOSED, csn.getType());
csn = csListener.nextNotification(1, TimeUnit.SECONDS);
assertEquals("Second CS notif type after kill",
JMXConnectionNotification.OPENED, csn.getType());
System.out.println("Checking that suspended notif has been received");
checkNotifs(listener, 10, 11);
}
private static void checkNotifs(
StoreListener sl, long start, long stop)
throws Exception {
for (long i = start; i < stop; i++) {
Notification n = sl.nextNotification(1, TimeUnit.SECONDS);
assertEquals("received sequence number", i, n.getSequenceNumber());
}
}
private static void assertEquals(String what, Object expect, Object actual)
throws Exception {
if (!expect.equals(actual)) {
fail(what + " should be " + expect + " but is " + actual);
}
}
private static void fail(String why) throws Exception {
throw new Exception("TEST FAILED: " + why);
}
private static void dumpThreads() {
System.out.println("Thread stack dump");
Map<Thread, StackTraceElement[]> traces = Thread.getAllStackTraces();
for (Map.Entry<Thread, StackTraceElement[]> entry : traces.entrySet()) {
Thread t = entry.getKey();
System.out.println("===Thread " + t.getName() + "===");
for (StackTraceElement ste : entry.getValue())
System.out.println(" " + ste);
}
}
}