test/jdk/sun/net/www/protocol/https/TestHttpsServer.java
changeset 55031 37a077319427
parent 47216 71c04702a3d5
child 57778 6768b0f490df
equal deleted inserted replaced
55030:703b2c04fc2c 55031:37a077319427
     1 /*
     1 /*
     2  * Copyright (c) 2002, 2012, Oracle and/or its affiliates. All rights reserved.
     2  * Copyright (c) 2002, 2019, Oracle and/or its affiliates. All rights reserved.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     4  *
     4  *
     5  * This code is free software; you can redistribute it and/or modify it
     5  * This code is free software; you can redistribute it and/or modify it
     6  * under the terms of the GNU General Public License version 2 only, as
     6  * under the terms of the GNU General Public License version 2 only, as
     7  * published by the Free Software Foundation.
     7  * published by the Free Software Foundation.
    68      * and up to ten TCP connections will be handled simultaneously.
    68      * and up to ten TCP connections will be handled simultaneously.
    69      * @param cb the callback object which is invoked to handle each
    69      * @param cb the callback object which is invoked to handle each
    70      *  incoming request
    70      *  incoming request
    71      */
    71      */
    72 
    72 
    73     public TestHttpsServer (HttpCallback cb) throws IOException {
    73     public TestHttpsServer(HttpCallback cb) throws IOException {
    74         this (cb, 1, 10, 0);
    74         this(cb, 1, 10, 0);
    75     }
    75     }
    76 
    76 
    77     /**
    77     /**
    78      * Create a <code>TestHttpsServer<code> instance with the specified number of
    78      * Create a <code>TestHttpsServer<code> instance with the specified number of
    79      * threads and maximum number of connections per thread. This functions
    79      * threads and maximum number of connections per thread. This functions
    84      *     in parallel
    84      *     in parallel
    85      * @param cperthread the number of simultaneous TCP connections to
    85      * @param cperthread the number of simultaneous TCP connections to
    86      *     handle per thread
    86      *     handle per thread
    87      */
    87      */
    88 
    88 
    89     public TestHttpsServer (HttpCallback cb, int threads, int cperthread)
    89     public TestHttpsServer(HttpCallback cb, int threads, int cperthread)
    90         throws IOException {
    90         throws IOException {
    91         this (cb, threads, cperthread, 0);
    91         this(cb, threads, cperthread, 0);
    92     }
    92     }
    93 
    93 
    94     /**
    94     /**
    95      * Create a <code>TestHttpsServer<code> instance with the specified number
    95      * Create a <code>TestHttpsServer<code> instance with the specified number
    96      * of threads and maximum number of connections per thread and running on
    96      * of threads and maximum number of connections per thread and running on
   104      * @param cperthread the number of simultaneous TCP connections
   104      * @param cperthread the number of simultaneous TCP connections
   105      *  to handle per thread
   105      *  to handle per thread
   106      * @param port the port number to bind the server to. <code>Zero</code>
   106      * @param port the port number to bind the server to. <code>Zero</code>
   107      *  means choose any free port.
   107      *  means choose any free port.
   108      */
   108      */
   109 
   109     public TestHttpsServer(HttpCallback cb, int threads, int cperthread, int port)
   110     public TestHttpsServer (HttpCallback cb, int threads, int cperthread, int port)
       
   111         throws IOException {
   110         throws IOException {
   112         schan = ServerSocketChannel.open ();
   111         this(cb, threads, cperthread, null, port);
   113         InetSocketAddress addr = new InetSocketAddress (port);
   112     }
   114         schan.socket().bind (addr);
   113 
       
   114     /**
       
   115      * Create a <code>TestHttpsServer<code> instance with the specified number
       
   116      * of threads and maximum number of connections per thread and running on
       
   117      * the specified port. The specified number of threads are created to
       
   118      * handle incoming requests, and each thread is allowed
       
   119      * to handle a number of simultaneous TCP connections.
       
   120      * @param cb the callback object which is invoked to handle
       
   121      *  each incoming request
       
   122      * @param threads the number of threads to create to handle
       
   123      *  requests in parallel
       
   124      * @param cperthread the number of simultaneous TCP connections
       
   125      *  to handle per thread
       
   126      * @param address the InetAddress to bind to. {@code Null} means the
       
   127      *  wildcard address.
       
   128      * @param port the port number to bind the server to. {@code Zero}
       
   129      *  means choose any free port.
       
   130      */
       
   131 
       
   132     public TestHttpsServer(HttpCallback cb, int threads, int cperthread, InetAddress address, int port)
       
   133         throws IOException {
       
   134         schan = ServerSocketChannel.open();
       
   135         InetSocketAddress addr = new InetSocketAddress(address, port);
       
   136         schan.socket().bind(addr);
   115         this.threads = threads;
   137         this.threads = threads;
   116         this.cb = cb;
   138         this.cb = cb;
   117         this.cperthread = cperthread;
   139         this.cperthread = cperthread;
   118 
   140 
   119         try {
   141         try {
   133 
   155 
   134             sslCtx = SSLContext.getInstance("TLS");
   156             sslCtx = SSLContext.getInstance("TLS");
   135 
   157 
   136             sslCtx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
   158             sslCtx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
   137 
   159 
   138             servers = new Server [threads];
   160             servers = new Server[threads];
   139             for (int i=0; i<threads; i++) {
   161             for (int i=0; i<threads; i++) {
   140                 servers[i] = new Server (cb, schan, cperthread);
   162                 servers[i] = new Server(cb, schan, cperthread);
   141                 servers[i].start();
   163                 servers[i].start();
   142             }
   164             }
   143         } catch (Exception ex) {
   165         } catch (Exception ex) {
   144             throw new RuntimeException("test failed. cause: "+ex.getMessage());
   166             throw new RuntimeException("test failed. cause: "+ex.getMessage());
   145         }
   167         }
   148     /** Tell all threads in the server to exit within 5 seconds.
   170     /** Tell all threads in the server to exit within 5 seconds.
   149      *  This is an abortive termination. Just prior to the thread exiting
   171      *  This is an abortive termination. Just prior to the thread exiting
   150      *  all channels in that thread waiting to be closed are forceably closed.
   172      *  all channels in that thread waiting to be closed are forceably closed.
   151      */
   173      */
   152 
   174 
   153     public void terminate () {
   175     public void terminate() {
   154         for (int i=0; i<threads; i++) {
   176         for (int i=0; i<threads; i++) {
   155             servers[i].terminate ();
   177             servers[i].terminate ();
   156         }
   178         }
   157     }
   179     }
   158 
   180 
   161      * @return the local port number
   183      * @return the local port number
   162      */
   184      */
   163 
   185 
   164     public int getLocalPort () {
   186     public int getLocalPort () {
   165         return schan.socket().getLocalPort ();
   187         return schan.socket().getLocalPort ();
       
   188     }
       
   189 
       
   190     public String getAuthority() {
       
   191         InetAddress address = schan.socket().getInetAddress();
       
   192         String hostaddr = address.getHostAddress();
       
   193         if (address.isAnyLocalAddress()) hostaddr = "localhost";
       
   194         if (hostaddr.indexOf(':') > -1) hostaddr = "[" + hostaddr + "]";
       
   195         return hostaddr + ":" + getLocalPort();
   166     }
   196     }
   167 
   197 
   168     static class Server extends Thread {
   198     static class Server extends Thread {
   169 
   199 
   170         ServerSocketChannel schan;
   200         ServerSocketChannel schan;
   176         int maxconn;
   206         int maxconn;
   177         int nconn;
   207         int nconn;
   178         ClosedChannelList clist;
   208         ClosedChannelList clist;
   179         boolean shutdown;
   209         boolean shutdown;
   180 
   210 
   181         Server (HttpCallback cb, ServerSocketChannel schan, int maxconn) {
   211         Server(HttpCallback cb, ServerSocketChannel schan, int maxconn) {
   182             this.schan = schan;
   212             this.schan = schan;
   183             this.maxconn = maxconn;
   213             this.maxconn = maxconn;
   184             this.cb = cb;
   214             this.cb = cb;
   185             nconn = 0;
   215             nconn = 0;
   186             consumeBuffer = ByteBuffer.allocate (512);
   216             consumeBuffer = ByteBuffer.allocate(512);
   187             clist = new ClosedChannelList ();
   217             clist = new ClosedChannelList();
   188             try {
   218             try {
   189                 selector = Selector.open ();
   219                 selector = Selector.open();
   190                 schan.configureBlocking (false);
   220                 schan.configureBlocking(false);
   191                 listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);
   221                 listenerKey = schan.register(selector, SelectionKey.OP_ACCEPT);
   192             } catch (IOException e) {
   222             } catch (IOException e) {
   193                 System.err.println ("Server could not start: " + e);
   223                 System.err.println("Server could not start: " + e);
   194             }
   224             }
   195         }
   225         }
   196 
   226 
   197         /* Stop the thread as soon as possible */
   227         /* Stop the thread as soon as possible */
   198         public synchronized void terminate () {
   228         public synchronized void terminate() {
   199             shutdown = true;
   229             shutdown = true;
   200         }
   230         }
   201 
   231 
   202         public void run ()  {
   232         public void run()  {
   203             try {
   233             try {
   204                 while (true) {
   234                 while (true) {
   205                     selector.select (1000);
   235                     selector.select(1000);
   206                     Set selected = selector.selectedKeys();
   236                     Set selected = selector.selectedKeys();
   207                     Iterator iter = selected.iterator();
   237                     Iterator iter = selected.iterator();
   208                     while (iter.hasNext()) {
   238                     while (iter.hasNext()) {
   209                         key = (SelectionKey)iter.next();
   239                         key = (SelectionKey)iter.next();
   210                         if (key.equals (listenerKey)) {
   240                         if (key.equals (listenerKey)) {
   211                             SocketChannel sock = schan.accept ();
   241                             SocketChannel sock = schan.accept();
   212                             if (sock == null) {
   242                             if (sock == null) {
   213                                 /* false notification */
   243                                 /* false notification */
   214                                 iter.remove();
   244                                 iter.remove();
   215                                 continue;
   245                                 continue;
   216                             }
   246                             }
   217                             sock.configureBlocking (true);
   247                             sock.configureBlocking(true);
   218                             SSLEngine sslEng = sslCtx.createSSLEngine();
   248                             SSLEngine sslEng = sslCtx.createSSLEngine();
   219                             sslEng.setUseClientMode(false);
   249                             sslEng.setUseClientMode(false);
   220                             new ServerWorker(cb, sock, sslEng).start();
   250                             new ServerWorker(cb, sock, sslEng).start();
   221                             nconn ++;
   251                             nconn ++;
   222                             if (nconn == maxconn) {
   252                             if (nconn == maxconn) {
   223                                 /* deregister */
   253                                 /* deregister */
   224                                 listenerKey.cancel ();
   254                                 listenerKey.cancel();
   225                                 listenerKey = null;
   255                                 listenerKey = null;
   226                             }
   256                             }
   227                         } else {
   257                         } else {
   228                             if (key.isReadable()) {
   258                             if (key.isReadable()) {
   229                                 boolean closed = false;
   259                                 boolean closed = false;
   230                                 SocketChannel chan = (SocketChannel) key.channel();
   260                                 SocketChannel chan = (SocketChannel)key.channel();
   231                                 if (key.attachment() != null) {
   261                                 if (key.attachment() != null) {
   232                                     closed = consume (chan);
   262                                     closed = consume(chan);
   233                                 }
   263                                 }
   234 
   264 
   235                                 if (closed) {
   265                                 if (closed) {
   236                                     chan.close ();
   266                                     chan.close();
   237                                     key.cancel ();
   267                                     key.cancel();
   238                                     if (nconn == maxconn) {
   268                                     if (nconn == maxconn) {
   239                                         listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);
   269                                         listenerKey = schan.register(selector, SelectionKey.OP_ACCEPT);
   240                                     }
   270                                     }
   241                                     nconn --;
   271                                     nconn --;
   242                                 }
   272                                 }
   243                             }
   273                             }
   244                         }
   274                         }
   246                     }
   276                     }
   247                     clist.check();
   277                     clist.check();
   248 
   278 
   249                     synchronized (this) {
   279                     synchronized (this) {
   250                         if (shutdown) {
   280                         if (shutdown) {
   251                             clist.terminate ();
   281                             clist.terminate();
   252                             return;
   282                             return;
   253                         }
   283                         }
   254                     }
   284                     }
   255                 }
   285                 }
   256             } catch (IOException e) {
   286             } catch (IOException e) {
   257                 System.out.println ("Server exception: " + e);
   287                 System.out.println("Server exception: " + e);
   258                 // TODO finish
   288                 // TODO finish
   259             }
   289             }
   260         }
   290         }
   261 
   291 
   262         /* read all the data off the channel without looking at it
   292         /* read all the data off the channel without looking at it
   263              * return true if connection closed
   293          * return true if connection closed
   264              */
   294          */
   265         boolean consume (SocketChannel chan) {
   295         boolean consume(SocketChannel chan) {
   266             try {
   296             try {
   267                 consumeBuffer.clear ();
   297                 consumeBuffer.clear();
   268                 int c = chan.read (consumeBuffer);
   298                 int c = chan.read(consumeBuffer);
   269                 if (c == -1)
   299                 if (c == -1)
   270                     return true;
   300                     return true;
   271             } catch (IOException e) {
   301             } catch (IOException e) {
   272                 return true;
   302                 return true;
   273             }
   303             }
   296         /*
   326         /*
   297          * Application buffers, also used for handshaking
   327          * Application buffers, also used for handshaking
   298          */
   328          */
   299         private int appBBSize;
   329         private int appBBSize;
   300 
   330 
   301         ServerWorker (HttpCallback cb, SocketChannel schan, SSLEngine sslEng) {
   331         ServerWorker(HttpCallback cb, SocketChannel schan, SSLEngine sslEng) {
   302             this.sslEng = sslEng;
   332             this.sslEng = sslEng;
   303             this.schan = schan;
   333             this.schan = schan;
   304             this.cb = cb;
   334             this.cb = cb;
   305             currentHSStatus = HandshakeStatus.NEED_UNWRAP;
   335             currentHSStatus = HandshakeStatus.NEED_UNWRAP;
   306             initialHSComplete = false;
   336             initialHSComplete = false;
   429             }
   459             }
   430         }
   460         }
   431 
   461 
   432         /* return true if the connection is closed, false otherwise */
   462         /* return true if the connection is closed, false otherwise */
   433 
   463 
   434         private boolean read (SocketChannel chan, SSLEngine sslEng) {
   464         private boolean read(SocketChannel chan, SSLEngine sslEng) {
   435             HttpTransaction msg;
   465             HttpTransaction msg;
   436             boolean res;
   466             boolean res;
   437             try {
   467             try {
   438                 InputStream is = new BufferedInputStream (new NioInputStream (chan, sslEng, inNetBB, inAppBB));
   468                 InputStream is = new BufferedInputStream(new NioInputStream(chan, sslEng, inNetBB, inAppBB));
   439                 String requestline = readLine (is);
   469                 String requestline = readLine(is);
   440                 MessageHeader mhead = new MessageHeader (is);
   470                 MessageHeader mhead = new MessageHeader(is);
   441                 String clen = mhead.findValue ("Content-Length");
   471                 String clen = mhead.findValue("Content-Length");
   442                 String trferenc = mhead.findValue ("Transfer-Encoding");
   472                 String trferenc = mhead.findValue("Transfer-Encoding");
   443                 String data = null;
   473                 String data = null;
   444                 if (trferenc != null && trferenc.equals ("chunked"))
   474                 if (trferenc != null && trferenc.equals("chunked"))
   445                     data = new String (readChunkedData (is));
   475                     data = new String(readChunkedData(is));
   446                 else if (clen != null)
   476                 else if (clen != null)
   447                     data = new String (readNormalData (is, Integer.parseInt (clen)));
   477                     data = new String(readNormalData(is, Integer.parseInt(clen)));
   448                 String[] req = requestline.split (" ");
   478                 String[] req = requestline.split(" ");
   449                 if (req.length < 2) {
   479                 if (req.length < 2) {
   450                     /* invalid request line */
   480                     /* invalid request line */
   451                     return false;
   481                     return false;
   452                 }
   482                 }
   453                 String cmd = req[0];
   483                 String cmd = req[0];
   454                 URI uri = null;
   484                 URI uri = null;
   455                 try {
   485                 try {
   456                     uri = new URI (req[1]);
   486                     uri = new URI(req[1]);
   457                     msg = new HttpTransaction (this, cmd, uri, mhead, data, null, chan);
   487                     msg = new HttpTransaction(this, cmd, uri, mhead, data, null, chan);
   458                     cb.request (msg);
   488                     cb.request(msg);
   459                 } catch (URISyntaxException e) {
   489                 } catch (URISyntaxException e) {
   460                     System.err.println ("Invalid URI: " + e);
   490                     System.err.println ("Invalid URI: " + e);
   461                     msg = new HttpTransaction (this, cmd, null, null, null, null, chan);
   491                     msg = new HttpTransaction(this, cmd, null, null, null, null, chan);
   462                     msg.sendResponse (501, "Whatever");
   492                     msg.sendResponse(501, "Whatever");
   463                 }
   493                 }
   464                 res = false;
   494                 res = false;
   465             } catch (IOException e) {
   495             } catch (IOException e) {
   466                 res = true;
   496                 res = true;
   467             }
   497             }
   468             return res;
   498             return res;
   469         }
   499         }
   470 
   500 
   471         byte[] readNormalData (InputStream is, int len) throws IOException {
   501         byte[] readNormalData(InputStream is, int len) throws IOException {
   472             byte [] buf  = new byte [len];
   502             byte[] buf  = new byte[len];
   473             int c, off=0, remain=len;
   503             int c, off=0, remain=len;
   474             while (remain > 0 && ((c=is.read (buf, off, remain))>0)) {
   504             while (remain > 0 && ((c=is.read (buf, off, remain))>0)) {
   475                 remain -= c;
   505                 remain -= c;
   476                 off += c;
   506                 off += c;
   477             }
   507             }
   487                 throw new IOException(
   517                 throw new IOException(
   488                     "Expected <CR><LF>:  got '" + cr + "/" + lf + "'");
   518                     "Expected <CR><LF>:  got '" + cr + "/" + lf + "'");
   489             }
   519             }
   490         }
   520         }
   491 
   521 
   492         byte[] readChunkedData (InputStream is) throws IOException {
   522         byte[] readChunkedData(InputStream is) throws IOException {
   493             LinkedList l = new LinkedList ();
   523             LinkedList l = new LinkedList();
   494             int total = 0;
   524             int total = 0;
   495             for (int len=readChunkLen(is); len!=0; len=readChunkLen(is)) {
   525             for (int len=readChunkLen(is); len!=0; len=readChunkLen(is)) {
   496                 l.add (readNormalData(is, len));
   526                 l.add(readNormalData(is, len));
   497                 total += len;
   527                 total += len;
   498                 readCRLF(is); // CRLF at end of chunk
   528                 readCRLF(is); // CRLF at end of chunk
   499             }
   529             }
   500             readCRLF(is); // CRLF at end of Chunked Stream.
   530             readCRLF(is); // CRLF at end of Chunked Stream.
   501             byte[] buf = new byte [total];
   531             byte[] buf = new byte[total];
   502             Iterator i = l.iterator();
   532             Iterator i = l.iterator();
   503             int x = 0;
   533             int x = 0;
   504             while (i.hasNext()) {
   534             while (i.hasNext()) {
   505                 byte[] b = (byte[])i.next();
   535                 byte[] b = (byte[])i.next();
   506                 System.arraycopy (b, 0, buf, x, b.length);
   536                 System.arraycopy(b, 0, buf, x, b.length);
   507                 x += b.length;
   537                 x += b.length;
   508             }
   538             }
   509             return buf;
   539             return buf;
   510         }
   540         }
   511 
   541 
   512         private int readChunkLen (InputStream is) throws IOException {
   542         private int readChunkLen(InputStream is) throws IOException {
   513             int c, len=0;
   543             int c, len=0;
   514             boolean done=false, readCR=false;
   544             boolean done=false, readCR=false;
   515             while (!done) {
   545             while (!done) {
   516                 c = is.read ();
   546                 c = is.read();
   517                 if (c == '\n' && readCR) {
   547                 if (c == '\n' && readCR) {
   518                     done = true;
   548                     done = true;
   519                 } else {
   549                 } else {
   520                     if (c == '\r' && !readCR) {
   550                     if (c == '\r' && !readCR) {
   521                         readCR = true;
   551                         readCR = true;
   533                 }
   563                 }
   534             }
   564             }
   535             return len;
   565             return len;
   536         }
   566         }
   537 
   567 
   538         private String readLine (InputStream is) throws IOException {
   568         private String readLine(InputStream is) throws IOException {
   539             boolean done=false, readCR=false;
   569             boolean done=false, readCR=false;
   540             byte[] b = new byte [512];
   570             byte[] b = new byte[512];
   541             int c, l = 0;
   571             int c, l = 0;
   542 
   572 
   543             while (!done) {
   573             while (!done) {
   544                 c = is.read ();
   574                 c = is.read();
   545                 if (c == '\n' && readCR) {
   575                 if (c == '\n' && readCR) {
   546                     done = true;
   576                     done = true;
   547                 } else {
   577                 } else {
   548                     if (c == '\r' && !readCR) {
   578                     if (c == '\r' && !readCR) {
   549                         readCR = true;
   579                         readCR = true;
   550                     } else {
   580                     } else {
   551                         b[l++] = (byte)c;
   581                         b[l++] = (byte)c;
   552                     }
   582                     }
   553                 }
   583                 }
   554             }
   584             }
   555             return new String (b);
   585             return new String(b);
   556         }
   586         }
   557 
   587 
   558         /** close the channel associated with the current key by:
   588         /** close the channel associated with the current key by:
   559          * 1. shutdownOutput (send a FIN)
   589          * 1. shutdownOutput (send a FIN)
   560          * 2. mark the key so that incoming data is to be consumed and discarded
   590          * 2. mark the key so that incoming data is to be consumed and discarded
   561          * 3. After a period, close the socket
   591          * 3. After a period, close the socket
   562          */
   592          */
   563 
   593 
   564         synchronized void orderlyCloseChannel (SocketChannel ch) throws IOException {
   594         synchronized void orderlyCloseChannel(SocketChannel ch) throws IOException {
   565             ch.socket().shutdownOutput();
   595             ch.socket().shutdownOutput();
   566         }
   596         }
   567 
   597 
   568         synchronized void abortiveCloseChannel (SocketChannel ch) throws IOException {
   598         synchronized void abortiveCloseChannel(SocketChannel ch) throws IOException {
   569             Socket s = ch.socket ();
   599             Socket s = ch.socket();
   570             s.setSoLinger (true, 0);
   600             s.setSoLinger(true, 0);
   571             ch.close();
   601             ch.close();
   572         }
   602         }
   573     }
   603     }
   574 
   604 
   575 
   605 
   590         ByteBuffer markBuf; /* reads may be satisifed from this buffer */
   620         ByteBuffer markBuf; /* reads may be satisifed from this buffer */
   591         boolean marked;
   621         boolean marked;
   592         boolean reset;
   622         boolean reset;
   593         int readlimit;
   623         int readlimit;
   594 
   624 
   595         public NioInputStream (SocketChannel chan, SSLEngine sslEng, ByteBuffer inNetBB, ByteBuffer inAppBB) throws IOException {
   625         public NioInputStream(SocketChannel chan, SSLEngine sslEng, ByteBuffer inNetBB, ByteBuffer inAppBB) throws IOException {
   596             this.sslEng = sslEng;
   626             this.sslEng = sslEng;
   597             this.channel = chan;
   627             this.channel = chan;
   598             selector = Selector.open();
   628             selector = Selector.open();
   599             this.inNetBB = inNetBB;
   629             this.inNetBB = inNetBB;
   600             this.inAppBB = inAppBB;
   630             this.inAppBB = inAppBB;
   601             key = chan.register (selector, SelectionKey.OP_READ);
   631             key = chan.register(selector, SelectionKey.OP_READ);
   602             available = 0;
   632             available = 0;
   603             one = new byte[1];
   633             one = new byte[1];
   604             closed = marked = reset = false;
   634             closed = marked = reset = false;
   605         }
   635         }
   606 
   636 
   607         public synchronized int read (byte[] b) throws IOException {
   637         public synchronized int read(byte[] b) throws IOException {
   608             return read (b, 0, b.length);
   638             return read(b, 0, b.length);
   609         }
   639         }
   610 
   640 
   611         public synchronized int read () throws IOException {
   641         public synchronized int read() throws IOException {
   612             return read (one, 0, 1);
   642             return read(one, 0, 1);
   613         }
   643         }
   614 
   644 
   615         public synchronized int read (byte[] b, int off, int srclen) throws IOException {
   645         public synchronized int read(byte[] b, int off, int srclen) throws IOException {
   616 
   646 
   617             int canreturn, willreturn;
   647             int canreturn, willreturn;
   618 
   648 
   619             if (closed)
   649             if (closed)
   620                 return -1;
   650                 return -1;
   621 
   651 
   622             if (reset) { /* satisfy from markBuf */
   652             if (reset) { /* satisfy from markBuf */
   623                 canreturn = markBuf.remaining ();
   653                 canreturn = markBuf.remaining();
   624                 willreturn = canreturn>srclen ? srclen : canreturn;
   654                 willreturn = canreturn > srclen ? srclen : canreturn;
   625                 markBuf.get(b, off, willreturn);
   655                 markBuf.get(b, off, willreturn);
   626                 if (canreturn == willreturn) {
   656                 if (canreturn == willreturn) {
   627                     reset = false;
   657                     reset = false;
   628                 }
   658                 }
   629             } else { /* satisfy from channel */
   659             } else { /* satisfy from channel */
   630                 canreturn = available();
   660                 canreturn = available();
   631                 if (canreturn == 0) {
   661                 if (canreturn == 0) {
   632                     block ();
   662                     block();
   633                     canreturn = available();
   663                     canreturn = available();
   634                 }
   664                 }
   635                 willreturn = canreturn>srclen ? srclen : canreturn;
   665                 willreturn = canreturn > srclen ? srclen : canreturn;
   636                 inAppBB.get(b, off, willreturn);
   666                 inAppBB.get(b, off, willreturn);
   637                 available -= willreturn;
   667                 available -= willreturn;
   638 
   668 
   639                 if (marked) { /* copy into markBuf */
   669                 if (marked) { /* copy into markBuf */
   640                     try {
   670                     try {
   641                         markBuf.put (b, off, willreturn);
   671                         markBuf.put(b, off, willreturn);
   642                     } catch (BufferOverflowException e) {
   672                     } catch (BufferOverflowException e) {
   643                         marked = false;
   673                         marked = false;
   644                     }
   674                     }
   645                 }
   675                 }
   646             }
   676             }
   647             return willreturn;
   677             return willreturn;
   648         }
   678         }
   649 
   679 
   650         public synchronized int available () throws IOException {
   680         public synchronized int available() throws IOException {
   651             if (closed)
   681             if (closed)
   652                 throw new IOException ("Stream is closed");
   682                 throw new IOException("Stream is closed");
   653 
   683 
   654             if (reset)
   684             if (reset)
   655                 return markBuf.remaining();
   685                 return markBuf.remaining();
   656 
   686 
   657             if (available > 0)
   687             if (available > 0)
   658                 return available;
   688                 return available;
   659 
   689 
   660             inAppBB.clear ();
   690             inAppBB.clear();
   661             int bytes = channel.read (inNetBB);
   691             int bytes = channel.read(inNetBB);
   662 
   692 
   663             int needed = sslEng.getSession().getApplicationBufferSize();
   693             int needed = sslEng.getSession().getApplicationBufferSize();
   664             if (needed > inAppBB.remaining()) {
   694             if (needed > inAppBB.remaining()) {
   665                 inAppBB = ByteBuffer.allocate(needed);
   695                 inAppBB = ByteBuffer.allocate(needed);
   666             }
   696             }
   670             available = result.bytesProduced();
   700             available = result.bytesProduced();
   671 
   701 
   672             if (available > 0)
   702             if (available > 0)
   673                 inAppBB.flip();
   703                 inAppBB.flip();
   674             else if (available == -1)
   704             else if (available == -1)
   675                 throw new IOException ("Stream is closed");
   705                 throw new IOException("Stream is closed");
   676             return available;
   706             return available;
   677         }
   707         }
   678 
   708 
   679         /**
   709         /**
   680          * block() only called when available==0 and buf is empty
   710          * block() only called when available==0 and buf is empty
   681          */
   711          */
   682         private synchronized void block () throws IOException {
   712         private synchronized void block() throws IOException {
   683             //assert available == 0;
   713             //assert available == 0;
   684             int n = selector.select ();
   714             int n = selector.select();
   685             //assert n == 1;
   715             //assert n == 1;
   686             selector.selectedKeys().clear();
   716             selector.selectedKeys().clear();
   687             available ();
   717             available();
   688         }
   718         }
   689 
   719 
   690         public void close () throws IOException {
   720         public void close() throws IOException {
   691             if (closed)
   721             if (closed)
   692                 return;
   722                 return;
   693             channel.close ();
   723             channel.close();
   694             closed = true;
   724             closed = true;
   695         }
   725         }
   696 
   726 
   697         public synchronized void mark (int readlimit) {
   727         public synchronized void mark(int readlimit) {
   698             if (closed)
   728             if (closed)
   699                 return;
   729                 return;
   700             this.readlimit = readlimit;
   730             this.readlimit = readlimit;
   701             markBuf = ByteBuffer.allocate (readlimit);
   731             markBuf = ByteBuffer.allocate(readlimit);
   702             marked = true;
   732             marked = true;
   703             reset = false;
   733             reset = false;
   704         }
   734         }
   705 
   735 
   706         public synchronized void reset () throws IOException {
   736         public synchronized void reset() throws IOException {
   707             if (closed )
   737             if (closed )
   708                 return;
   738                 return;
   709             if (!marked)
   739             if (!marked)
   710                 throw new IOException ("Stream not marked");
   740                 throw new IOException("Stream not marked");
   711             marked = false;
   741             marked = false;
   712             reset = true;
   742             reset = true;
   713             markBuf.flip ();
   743             markBuf.flip();
   714         }
   744         }
   715     }
   745     }
   716 
   746 
   717     static class NioOutputStream extends OutputStream {
   747     static class NioOutputStream extends OutputStream {
   718         SSLEngine sslEng;
   748         SSLEngine sslEng;
   722         SelectionKey key;
   752         SelectionKey key;
   723         Selector selector;
   753         Selector selector;
   724         boolean closed;
   754         boolean closed;
   725         byte[] one;
   755         byte[] one;
   726 
   756 
   727         public NioOutputStream (SocketChannel channel, SSLEngine sslEng, ByteBuffer outNetBB, ByteBuffer outAppBB) throws IOException {
   757         public NioOutputStream(SocketChannel channel, SSLEngine sslEng, ByteBuffer outNetBB, ByteBuffer outAppBB) throws IOException {
   728             this.sslEng = sslEng;
   758             this.sslEng = sslEng;
   729             this.channel = channel;
   759             this.channel = channel;
   730             this.outNetBB = outNetBB;
   760             this.outNetBB = outNetBB;
   731             this.outAppBB = outAppBB;
   761             this.outAppBB = outAppBB;
   732             selector = Selector.open ();
   762             selector = Selector.open();
   733             key = channel.register (selector, SelectionKey.OP_WRITE);
   763             key = channel.register(selector, SelectionKey.OP_WRITE);
   734             closed = false;
   764             closed = false;
   735             one = new byte [1];
   765             one = new byte[1];
   736         }
   766         }
   737 
   767 
   738         public synchronized void write (int b) throws IOException {
   768         public synchronized void write(int b) throws IOException {
   739             one[0] = (byte)b;
   769             one[0] = (byte)b;
   740             write (one, 0, 1);
   770             write(one, 0, 1);
   741         }
   771         }
   742 
   772 
   743         public synchronized void write (byte[] b) throws IOException {
   773         public synchronized void write(byte[] b) throws IOException {
   744             write (b, 0, b.length);
   774             write(b, 0, b.length);
   745         }
   775         }
   746 
   776 
   747         public synchronized void write (byte[] b, int off, int len) throws IOException {
   777         public synchronized void write(byte[] b, int off, int len) throws IOException {
   748             if (closed)
   778             if (closed)
   749                 throw new IOException ("stream is closed");
   779                 throw new IOException("stream is closed");
   750 
   780 
   751             outAppBB = ByteBuffer.allocate (len);
   781             outAppBB = ByteBuffer.allocate(len);
   752             outAppBB.put (b, off, len);
   782             outAppBB.put(b, off, len);
   753             outAppBB.flip ();
   783             outAppBB.flip();
   754             int n;
   784             int n;
   755             outNetBB.clear();
   785             outNetBB.clear();
   756             int needed = sslEng.getSession().getPacketBufferSize();
   786             int needed = sslEng.getSession().getPacketBufferSize();
   757             if (outNetBB.capacity() < needed) {
   787             if (outNetBB.capacity() < needed) {
   758                 outNetBB = ByteBuffer.allocate(needed);
   788                 outNetBB = ByteBuffer.allocate(needed);
   762             int newLen = ret.bytesProduced();
   792             int newLen = ret.bytesProduced();
   763             while ((n = channel.write (outNetBB)) < newLen) {
   793             while ((n = channel.write (outNetBB)) < newLen) {
   764                 newLen -= n;
   794                 newLen -= n;
   765                 if (newLen == 0)
   795                 if (newLen == 0)
   766                     return;
   796                     return;
   767                 selector.select ();
   797                 selector.select();
   768                 selector.selectedKeys().clear ();
   798                 selector.selectedKeys().clear();
   769             }
   799             }
   770         }
   800         }
   771 
   801 
   772         public void close () throws IOException {
   802         public void close() throws IOException {
   773             if (closed)
   803             if (closed)
   774                 return;
   804                 return;
   775             channel.close ();
   805             channel.close();
   776             closed = true;
   806             closed = true;
   777         }
   807         }
   778     }
   808     }
   779 
   809 
   780     /**
   810     /**
   800     /*
   830     /*
   801      * Modifiable int object
   831      * Modifiable int object
   802      */
   832      */
   803     private static class IValue {
   833     private static class IValue {
   804         int v;
   834         int v;
   805         IValue (int i) {
   835         IValue(int i) {
   806             v =i;
   836             v =i;
   807         }
   837         }
   808     }
   838     }
   809 
   839 
   810 
   840 
   811     private static BValue getCond (String condition) {
   841     private static BValue getCond(String condition) {
   812         synchronized (conditions) {
   842         synchronized (conditions) {
   813             BValue cond = (BValue) conditions.get (condition);
   843             BValue cond = (BValue) conditions.get(condition);
   814             if (cond == null) {
   844             if (cond == null) {
   815                 cond = new BValue();
   845                 cond = new BValue();
   816                 conditions.put (condition, cond);
   846                 conditions.put(condition, cond);
   817             }
   847             }
   818             return cond;
   848             return cond;
   819         }
   849         }
   820     }
   850     }
   821 
   851 
   825      * Threads that subsequently call waitForCondition() will not block.
   855      * Threads that subsequently call waitForCondition() will not block.
   826      * If the named condition did not exist prior to the call, then it is created
   856      * If the named condition did not exist prior to the call, then it is created
   827      * first.
   857      * first.
   828      */
   858      */
   829 
   859 
   830     public static void setCondition (String condition) {
   860     public static void setCondition(String condition) {
   831         BValue cond = getCond (condition);
   861         BValue cond = getCond(condition);
   832         synchronized (cond) {
   862         synchronized (cond) {
   833             if (cond.v) {
   863             if (cond.v) {
   834                 return;
   864                 return;
   835             }
   865             }
   836             cond.v = true;
   866             cond.v = true;
   844      * is false, then the thread blocks until another thread sets the condition.
   874      * is false, then the thread blocks until another thread sets the condition.
   845      * If the condition exists and is already set to true, then this call returns
   875      * If the condition exists and is already set to true, then this call returns
   846      * immediately without blocking.
   876      * immediately without blocking.
   847      */
   877      */
   848 
   878 
   849     public static void waitForCondition (String condition) {
   879     public static void waitForCondition(String condition) {
   850         BValue cond = getCond (condition);
   880         BValue cond = getCond(condition);
   851         synchronized (cond) {
   881         synchronized (cond) {
   852             if (!cond.v) {
   882             if (!cond.v) {
   853                 try {
   883                 try {
   854                     cond.wait();
   884                     cond.wait();
   855                 } catch (InterruptedException e) {}
   885                 } catch (InterruptedException e) {}
   870      * <P>
   900      * <P>
   871      * Obviously, if fewer than N threads make the rendezvous then the result
   901      * Obviously, if fewer than N threads make the rendezvous then the result
   872      * will be a hang.
   902      * will be a hang.
   873      */
   903      */
   874 
   904 
   875     public static void rendezvous (String condition, int N) {
   905     public static void rendezvous(String condition, int N) {
   876         BValue cond;
   906         BValue cond;
   877         IValue iv;
   907         IValue iv;
   878         String name = "RV_"+condition;
   908         String name = "RV_"+condition;
   879 
   909 
   880         /* get the condition */
   910         /* get the condition */
   881 
   911 
   882         synchronized (conditions) {
   912         synchronized (conditions) {
   883             cond = (BValue)conditions.get (name);
   913             cond = (BValue)conditions.get(name);
   884             if (cond == null) {
   914             if (cond == null) {
   885                 /* we are first caller */
   915                 /* we are first caller */
   886                 if (N < 2) {
   916                 if (N < 2) {
   887                     throw new RuntimeException ("rendezvous must be called with N >= 2");
   917                     throw new RuntimeException("rendezvous must be called with N >= 2");
   888                 }
   918                 }
   889                 cond = new BValue ();
   919                 cond = new BValue();
   890                 conditions.put (name, cond);
   920                 conditions.put(name, cond);
   891                 iv = new IValue (N-1);
   921                 iv = new IValue(N-1);
   892                 rv.put (name, iv);
   922                 rv.put(name, iv);
   893             } else {
   923             } else {
   894                 /* already initialised, just decrement the counter */
   924                 /* already initialised, just decrement the counter */
   895                 iv = (IValue) rv.get (name);
   925                 iv = (IValue) rv.get(name);
   896                 iv.v --;
   926                 iv.v--;
   897             }
   927             }
   898         }
   928         }
   899 
   929 
   900         if (iv.v > 0) {
   930         if (iv.v > 0) {
   901             waitForCondition (name);
   931             waitForCondition(name);
   902         } else {
   932         } else {
   903             setCondition (name);
   933             setCondition(name);
   904             synchronized (conditions) {
   934             synchronized (conditions) {
   905                 clearCondition (name);
   935                 clearCondition(name);
   906                 rv.remove (name);
   936                 rv.remove(name);
   907             }
   937             }
   908         }
   938         }
   909     }
   939     }
   910 
   940 
   911     /**
   941     /**
   917      */
   947      */
   918 
   948 
   919     public static void clearCondition(String condition) {
   949     public static void clearCondition(String condition) {
   920         BValue cond;
   950         BValue cond;
   921         synchronized (conditions) {
   951         synchronized (conditions) {
   922             cond = (BValue) conditions.get (condition);
   952             cond = (BValue) conditions.get(condition);
   923             if (cond == null) {
   953             if (cond == null) {
   924                 return;
   954                 return;
   925             }
   955             }
   926             synchronized (cond) {
   956             synchronized (cond) {
   927                 if (cond.v) {
   957                 if (cond.v) {
   928                     conditions.remove (condition);
   958                     conditions.remove(condition);
   929                 }
   959                 }
   930             }
   960             }
   931         }
   961         }
   932     }
   962     }
   933 }
   963 }