jdk/test/javax/management/eventService/SharingThreadTest.java
changeset 4159 9e3aae7675f1
parent 4158 0b4d21bc8b5c
parent 4156 acaa49a2768a
child 4160 bda0a85afcb7
equal deleted inserted replaced
4158:0b4d21bc8b5c 4159:9e3aae7675f1
     1 /*
       
     2  * Copyright 2007-2008 Sun Microsystems, Inc.  All Rights Reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
       
    20  * CA 95054 USA or visit www.sun.com if you need additional information or
       
    21  * have any questions.
       
    22  */
       
    23 
       
    24 /*
       
    25  * @test SharingThreadTest.java 1.3 08/01/22
       
    26  * @bug 5108776
       
    27  * @summary Basic test for EventClient to see internal thread management.
       
    28  * @author Shanliang JIANG
       
    29  * @run clean SharingThreadTest
       
    30  * @run build SharingThreadTest
       
    31  * @run main SharingThreadTest
       
    32  */
       
    33 
       
    34 import java.io.IOException;
       
    35 import java.util.concurrent.ArrayBlockingQueue;
       
    36 import java.util.concurrent.Executor;
       
    37 import java.util.concurrent.ThreadPoolExecutor;
       
    38 import java.util.concurrent.TimeUnit;
       
    39 import javax.management.MBeanServer;
       
    40 import javax.management.MBeanServerFactory;
       
    41 import javax.management.Notification;
       
    42 import javax.management.NotificationBroadcasterSupport;
       
    43 import javax.management.NotificationFilter;
       
    44 import javax.management.NotificationListener;
       
    45 import javax.management.ObjectName;
       
    46 import javax.management.event.EventClient;
       
    47 import javax.management.event.EventClientDelegate;
       
    48 import javax.management.event.EventClientDelegateMBean;
       
    49 import javax.management.event.FetchingEventRelay;
       
    50 import javax.management.event.RMIPushEventRelay;
       
    51 import javax.management.remote.JMXConnector;
       
    52 import javax.management.remote.JMXConnectorFactory;
       
    53 import javax.management.remote.JMXConnectorServer;
       
    54 import javax.management.remote.JMXConnectorServerFactory;
       
    55 import javax.management.remote.JMXServiceURL;
       
    56 
       
    57 
       
    58 public class SharingThreadTest {
       
    59 
       
    60     private static MBeanServer mbeanServer = MBeanServerFactory.createMBeanServer();
       
    61     private static ObjectName emitter;
       
    62     private static NotificationEmitter emitterImpl;
       
    63     private static JMXServiceURL url;
       
    64     private static JMXConnectorServer server;
       
    65 
       
    66 
       
    67     private static int toSend = 10;
       
    68     private static final long bigWaiting = 6000;
       
    69     private static int counter = 0;
       
    70     private static int jobs = 10;
       
    71     private static int endedJobs = 0;
       
    72 
       
    73     private static volatile String failure;
       
    74 
       
    75     private static Executor sharedExecutor = new ThreadPoolExecutor(0, 1, 1000,
       
    76             TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(jobs));
       
    77             //Executors.newFixedThreadPool(1);
       
    78 
       
    79     public static void main(String[] args) throws Exception {
       
    80         System.out.println(">>> Test on sharing threads for multiple EventClient.");
       
    81 
       
    82         // for 1.5
       
    83         if (System.getProperty("java.version").startsWith("1.5") &&
       
    84                 !mbeanServer.isRegistered(EventClientDelegateMBean.OBJECT_NAME)) {
       
    85             System.out.print("Working on "+System.getProperty("java.version")+
       
    86                     " register "+EventClientDelegateMBean.OBJECT_NAME);
       
    87 
       
    88             mbeanServer.registerMBean(EventClientDelegate.
       
    89                     getEventClientDelegate(mbeanServer),
       
    90                     EventClientDelegateMBean.OBJECT_NAME);
       
    91 
       
    92             sharedExecutor = new ThreadPoolExecutor(1, 1, 1000,
       
    93             TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(jobs));
       
    94         }
       
    95 
       
    96         emitter = new ObjectName("Default:name=NotificationEmitter");
       
    97         emitterImpl = new NotificationEmitter();
       
    98         mbeanServer.registerMBean(emitterImpl, emitter);
       
    99 
       
   100         String[] types = new String[]{"PushEventRelay", "FetchingEventRelay"};
       
   101         String[] protos = new String[]{"rmi", "iiop", "jmxmp"};
       
   102         for (String prot : protos) {
       
   103             url = new JMXServiceURL(prot, null, 0);
       
   104 
       
   105             try {
       
   106                 server =
       
   107                         JMXConnectorServerFactory.newJMXConnectorServer(url,
       
   108                         null, mbeanServer);
       
   109                 server.start();
       
   110             } catch (Exception e) {
       
   111                 System.out.println(">>> Skip "+prot+", not support.");
       
   112                 continue;
       
   113             }
       
   114 
       
   115             url = server.getAddress();
       
   116 
       
   117             // noise
       
   118             Thread noise = new Thread(new Runnable() {
       
   119                 public void run() {
       
   120                     while (true) {
       
   121                         emitterImpl.sendNotif(1, null);
       
   122                         try {
       
   123                             Thread.sleep(10);
       
   124                         } catch (Exception e) {
       
   125                             // OK
       
   126                         }
       
   127                     }
       
   128                 }
       
   129             });
       
   130             noise.setDaemon(true);
       
   131             noise.start();
       
   132 
       
   133             try {
       
   134                 for (String type: types) {
       
   135                     System.out.println("\n\n>>> Testing "+type+" on "+url+" ...");
       
   136                     JMXConnector conn = newConn();
       
   137                     try {
       
   138                         testType(type, conn);
       
   139                     } finally {
       
   140                         conn.close();
       
   141                         System.out.println(">>> Testing "+type+" on "+url+" ... done");
       
   142                     }
       
   143                 }
       
   144             } finally {
       
   145                 server.stop();
       
   146             }
       
   147         }
       
   148     }
       
   149 
       
   150     private static void testType(String type, JMXConnector conn) throws Exception {
       
   151         Thread[] threads = new Thread[jobs];
       
   152         for (int i=0; i<jobs; i++) {
       
   153             threads[i] = new Thread(new Job(type, conn));
       
   154             threads[i].setDaemon(true);
       
   155             threads[i].start();
       
   156         }
       
   157 
       
   158         // to wait
       
   159         long toWait = bigWaiting*jobs;
       
   160         long stopTime = System.currentTimeMillis() + toWait;
       
   161 
       
   162         synchronized(SharingThreadTest.class) {
       
   163             while (endedJobs < jobs && toWait > 0 && failure == null) {
       
   164                 SharingThreadTest.class.wait(toWait);
       
   165                 toWait = stopTime - System.currentTimeMillis();
       
   166             }
       
   167         }
       
   168 
       
   169         if (endedJobs != jobs && failure == null) {
       
   170             throw new RuntimeException("Need to set bigger waiting timeout?");
       
   171         }
       
   172 
       
   173         endedJobs = 0;
       
   174     }
       
   175 
       
   176     public static class Job implements Runnable {
       
   177         public Job(String type, JMXConnector conn) {
       
   178             this.type = type;
       
   179             this.conn = conn;
       
   180         }
       
   181         public void run() {
       
   182             try {
       
   183                 test(type, conn);
       
   184 
       
   185                 synchronized(SharingThreadTest.class) {
       
   186                     endedJobs++;
       
   187                     if (endedJobs>=jobs) {
       
   188                         SharingThreadTest.class.notify();
       
   189                     }
       
   190                 }
       
   191             } catch (RuntimeException re) {
       
   192                 re.printStackTrace(System.out);
       
   193                 throw re;
       
   194             } catch (Exception e) {
       
   195                 throw new RuntimeException(e);
       
   196             }
       
   197         }
       
   198 
       
   199         private final String type;
       
   200         private final JMXConnector conn;
       
   201     }
       
   202 
       
   203     private static void test(String type, JMXConnector conn) throws Exception {
       
   204         String id = getId();
       
   205 
       
   206         Listener listener = new Listener(id);
       
   207         Filter filter = new Filter(id);
       
   208 
       
   209         //newConn();
       
   210         EventClient ec = newEventClient(type, conn);
       
   211 
       
   212         System.out.println(">>> ("+id+") To receive notifications "+toSend);
       
   213         ec.addNotificationListener(emitter,
       
   214                 listener, filter, null);
       
   215 
       
   216         emitterImpl.sendNotif(toSend, id);
       
   217         listener.waitNotifs(bigWaiting, toSend);
       
   218         if (listener.received != toSend) {
       
   219             throw new RuntimeException(">>> ("+id+") Expected to receive: "
       
   220                     +toSend+", but got: "+listener.received);
       
   221         }
       
   222 
       
   223         ec.close();
       
   224     }
       
   225 
       
   226 //--------------------------
       
   227 // private classes
       
   228 //--------------------------
       
   229 
       
   230     private static class Listener implements NotificationListener {
       
   231         public Listener(String id) {
       
   232             this.id = id;
       
   233         }
       
   234         public void handleNotification(Notification notif, Object handback) {
       
   235             if (!id.equals(notif.getUserData())) {
       
   236                 System.out.println("("+id+") Filter error, my id is: "+id+
       
   237                         ", but got "+notif.getUserData());
       
   238                 System.exit(1);
       
   239             }
       
   240             System.out.println("("+id+") received "+notif.getSequenceNumber());
       
   241             synchronized (this) {
       
   242                 received++;
       
   243 
       
   244                 if (sequenceNB < 0) {
       
   245                     sequenceNB = notif.getSequenceNumber();
       
   246                 } else if(++sequenceNB != notif.getSequenceNumber()) {
       
   247                     fail("(" + id + ") Wrong sequence number, expected: "
       
   248                             +sequenceNB+", but got: "+notif.getSequenceNumber());
       
   249                 }
       
   250                 if (received >= toSend || failure != null) {
       
   251                     this.notify();
       
   252                 }
       
   253             }
       
   254         }
       
   255 
       
   256         public void waitNotifs(long timeout, int nb) throws Exception {
       
   257             long toWait = timeout;
       
   258             long stopTime = System.currentTimeMillis() + timeout;
       
   259             synchronized(this) {
       
   260                 while (received < nb && toWait > 0 && failure == null) {
       
   261                     this.wait(toWait);
       
   262                     toWait = stopTime - System.currentTimeMillis();
       
   263                 }
       
   264             }
       
   265         }
       
   266 
       
   267         private String id;
       
   268         private int received = 0;
       
   269 
       
   270         private long sequenceNB = -1;
       
   271     }
       
   272 
       
   273     private static class Filter implements NotificationFilter {
       
   274         public Filter(String id) {
       
   275             this.id = id;
       
   276         }
       
   277 
       
   278         public boolean isNotificationEnabled(Notification n) {
       
   279             return id.equals(n.getUserData());
       
   280         }
       
   281         private String id;
       
   282     }
       
   283 
       
   284     public static class NotificationEmitter extends NotificationBroadcasterSupport
       
   285             implements NotificationEmitterMBean {
       
   286 
       
   287         /**
       
   288          * Send Notification objects.
       
   289          *
       
   290          * @param nb The number of notifications to send
       
   291          */
       
   292         public void sendNotif(int nb, String userData) {
       
   293             new Thread(new SendJob(nb, userData)).start();
       
   294         }
       
   295 
       
   296         private class SendJob implements Runnable {
       
   297             public SendJob(int nb, String userData) {
       
   298                 this.nb = nb;
       
   299                 this.userData = userData;
       
   300             }
       
   301 
       
   302             public void run() {
       
   303                 if (userData != null) {
       
   304                     System.out.println(">>> ("+userData+") sending "+nb);
       
   305                 }
       
   306                 long sequenceNumber = 0;
       
   307                 for (int i = 0; i<nb; i++) {
       
   308                     Notification notif = new Notification(myType, emitter,
       
   309                             sequenceNumber++);
       
   310                     notif.setUserData(userData);
       
   311                     sendNotification(notif);
       
   312                     Thread.yield();
       
   313                     try {
       
   314                         Thread.sleep(1);
       
   315                     } catch (Exception e) {}
       
   316                 }
       
   317                 if (userData != null) {
       
   318                     System.out.println(">>> ("+userData+") sending done");
       
   319                 }
       
   320             }
       
   321             private int nb;
       
   322             private String userData;
       
   323         }
       
   324         private final String myType = "notification.my_notification";
       
   325     }
       
   326 
       
   327     public interface NotificationEmitterMBean {
       
   328         public void sendNotif(int nb, String userData);
       
   329     }
       
   330 
       
   331     private static JMXConnector newConn() throws IOException {
       
   332         return JMXConnectorFactory.connect(url);
       
   333     }
       
   334 
       
   335     private static EventClient newEventClient(String type, JMXConnector conn)
       
   336             throws Exception {
       
   337         EventClientDelegateMBean proxy =
       
   338                 EventClientDelegate.getProxy(conn.getMBeanServerConnection());
       
   339         if (type.equals("PushEventRelay")) {
       
   340             return new EventClient(proxy,
       
   341                     new RMIPushEventRelay(proxy), sharedExecutor, null, 600);
       
   342         } else if (type.equals("FetchingEventRelay")) {
       
   343             return new EventClient(proxy,
       
   344                     new FetchingEventRelay(proxy,
       
   345                     FetchingEventRelay.DEFAULT_BUFFER_SIZE,
       
   346                     10,
       
   347                     FetchingEventRelay.DEFAULT_MAX_NOTIFICATIONS,
       
   348                     sharedExecutor),
       
   349                     null, null, 600);
       
   350         } else {
       
   351             throw new RuntimeException("Wrong event client type: "+type);
       
   352         }
       
   353     }
       
   354 
       
   355     private static String getId() {
       
   356         synchronized(SharingThreadTest.class) {
       
   357             return String.valueOf(counter++);
       
   358         }
       
   359     }
       
   360 
       
   361     private static void fail(String msg) {
       
   362         System.out.println("FAIL: " + msg);
       
   363         failure = msg;
       
   364     }
       
   365 }