104 * @param cperthread the number of simultaneous TCP connections |
104 * @param cperthread the number of simultaneous TCP connections |
105 * to handle per thread |
105 * to handle per thread |
106 * @param port the port number to bind the server to. <code>Zero</code> |
106 * @param port the port number to bind the server to. <code>Zero</code> |
107 * means choose any free port. |
107 * means choose any free port. |
108 */ |
108 */ |
109 |
109 public TestHttpsServer(HttpCallback cb, int threads, int cperthread, int port) |
110 public TestHttpsServer (HttpCallback cb, int threads, int cperthread, int port) |
|
111 throws IOException { |
110 throws IOException { |
112 schan = ServerSocketChannel.open (); |
111 this(cb, threads, cperthread, null, port); |
113 InetSocketAddress addr = new InetSocketAddress (port); |
112 } |
114 schan.socket().bind (addr); |
113 |
|
114 /** |
|
115 * Create a <code>TestHttpsServer<code> instance with the specified number |
|
116 * of threads and maximum number of connections per thread and running on |
|
117 * the specified port. The specified number of threads are created to |
|
118 * handle incoming requests, and each thread is allowed |
|
119 * to handle a number of simultaneous TCP connections. |
|
120 * @param cb the callback object which is invoked to handle |
|
121 * each incoming request |
|
122 * @param threads the number of threads to create to handle |
|
123 * requests in parallel |
|
124 * @param cperthread the number of simultaneous TCP connections |
|
125 * to handle per thread |
|
126 * @param address the InetAddress to bind to. {@code Null} means the |
|
127 * wildcard address. |
|
128 * @param port the port number to bind the server to. {@code Zero} |
|
129 * means choose any free port. |
|
130 */ |
|
131 |
|
132 public TestHttpsServer(HttpCallback cb, int threads, int cperthread, InetAddress address, int port) |
|
133 throws IOException { |
|
134 schan = ServerSocketChannel.open(); |
|
135 InetSocketAddress addr = new InetSocketAddress(address, port); |
|
136 schan.socket().bind(addr); |
115 this.threads = threads; |
137 this.threads = threads; |
116 this.cb = cb; |
138 this.cb = cb; |
117 this.cperthread = cperthread; |
139 this.cperthread = cperthread; |
118 |
140 |
119 try { |
141 try { |
133 |
155 |
134 sslCtx = SSLContext.getInstance("TLS"); |
156 sslCtx = SSLContext.getInstance("TLS"); |
135 |
157 |
136 sslCtx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); |
158 sslCtx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); |
137 |
159 |
138 servers = new Server [threads]; |
160 servers = new Server[threads]; |
139 for (int i=0; i<threads; i++) { |
161 for (int i=0; i<threads; i++) { |
140 servers[i] = new Server (cb, schan, cperthread); |
162 servers[i] = new Server(cb, schan, cperthread); |
141 servers[i].start(); |
163 servers[i].start(); |
142 } |
164 } |
143 } catch (Exception ex) { |
165 } catch (Exception ex) { |
144 throw new RuntimeException("test failed. cause: "+ex.getMessage()); |
166 throw new RuntimeException("test failed. cause: "+ex.getMessage()); |
145 } |
167 } |
176 int maxconn; |
206 int maxconn; |
177 int nconn; |
207 int nconn; |
178 ClosedChannelList clist; |
208 ClosedChannelList clist; |
179 boolean shutdown; |
209 boolean shutdown; |
180 |
210 |
181 Server (HttpCallback cb, ServerSocketChannel schan, int maxconn) { |
211 Server(HttpCallback cb, ServerSocketChannel schan, int maxconn) { |
182 this.schan = schan; |
212 this.schan = schan; |
183 this.maxconn = maxconn; |
213 this.maxconn = maxconn; |
184 this.cb = cb; |
214 this.cb = cb; |
185 nconn = 0; |
215 nconn = 0; |
186 consumeBuffer = ByteBuffer.allocate (512); |
216 consumeBuffer = ByteBuffer.allocate(512); |
187 clist = new ClosedChannelList (); |
217 clist = new ClosedChannelList(); |
188 try { |
218 try { |
189 selector = Selector.open (); |
219 selector = Selector.open(); |
190 schan.configureBlocking (false); |
220 schan.configureBlocking(false); |
191 listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT); |
221 listenerKey = schan.register(selector, SelectionKey.OP_ACCEPT); |
192 } catch (IOException e) { |
222 } catch (IOException e) { |
193 System.err.println ("Server could not start: " + e); |
223 System.err.println("Server could not start: " + e); |
194 } |
224 } |
195 } |
225 } |
196 |
226 |
197 /* Stop the thread as soon as possible */ |
227 /* Stop the thread as soon as possible */ |
198 public synchronized void terminate () { |
228 public synchronized void terminate() { |
199 shutdown = true; |
229 shutdown = true; |
200 } |
230 } |
201 |
231 |
202 public void run () { |
232 public void run() { |
203 try { |
233 try { |
204 while (true) { |
234 while (true) { |
205 selector.select (1000); |
235 selector.select(1000); |
206 Set selected = selector.selectedKeys(); |
236 Set selected = selector.selectedKeys(); |
207 Iterator iter = selected.iterator(); |
237 Iterator iter = selected.iterator(); |
208 while (iter.hasNext()) { |
238 while (iter.hasNext()) { |
209 key = (SelectionKey)iter.next(); |
239 key = (SelectionKey)iter.next(); |
210 if (key.equals (listenerKey)) { |
240 if (key.equals (listenerKey)) { |
211 SocketChannel sock = schan.accept (); |
241 SocketChannel sock = schan.accept(); |
212 if (sock == null) { |
242 if (sock == null) { |
213 /* false notification */ |
243 /* false notification */ |
214 iter.remove(); |
244 iter.remove(); |
215 continue; |
245 continue; |
216 } |
246 } |
217 sock.configureBlocking (true); |
247 sock.configureBlocking(true); |
218 SSLEngine sslEng = sslCtx.createSSLEngine(); |
248 SSLEngine sslEng = sslCtx.createSSLEngine(); |
219 sslEng.setUseClientMode(false); |
249 sslEng.setUseClientMode(false); |
220 new ServerWorker(cb, sock, sslEng).start(); |
250 new ServerWorker(cb, sock, sslEng).start(); |
221 nconn ++; |
251 nconn ++; |
222 if (nconn == maxconn) { |
252 if (nconn == maxconn) { |
223 /* deregister */ |
253 /* deregister */ |
224 listenerKey.cancel (); |
254 listenerKey.cancel(); |
225 listenerKey = null; |
255 listenerKey = null; |
226 } |
256 } |
227 } else { |
257 } else { |
228 if (key.isReadable()) { |
258 if (key.isReadable()) { |
229 boolean closed = false; |
259 boolean closed = false; |
230 SocketChannel chan = (SocketChannel) key.channel(); |
260 SocketChannel chan = (SocketChannel)key.channel(); |
231 if (key.attachment() != null) { |
261 if (key.attachment() != null) { |
232 closed = consume (chan); |
262 closed = consume(chan); |
233 } |
263 } |
234 |
264 |
235 if (closed) { |
265 if (closed) { |
236 chan.close (); |
266 chan.close(); |
237 key.cancel (); |
267 key.cancel(); |
238 if (nconn == maxconn) { |
268 if (nconn == maxconn) { |
239 listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT); |
269 listenerKey = schan.register(selector, SelectionKey.OP_ACCEPT); |
240 } |
270 } |
241 nconn --; |
271 nconn --; |
242 } |
272 } |
243 } |
273 } |
244 } |
274 } |
429 } |
459 } |
430 } |
460 } |
431 |
461 |
432 /* return true if the connection is closed, false otherwise */ |
462 /* return true if the connection is closed, false otherwise */ |
433 |
463 |
434 private boolean read (SocketChannel chan, SSLEngine sslEng) { |
464 private boolean read(SocketChannel chan, SSLEngine sslEng) { |
435 HttpTransaction msg; |
465 HttpTransaction msg; |
436 boolean res; |
466 boolean res; |
437 try { |
467 try { |
438 InputStream is = new BufferedInputStream (new NioInputStream (chan, sslEng, inNetBB, inAppBB)); |
468 InputStream is = new BufferedInputStream(new NioInputStream(chan, sslEng, inNetBB, inAppBB)); |
439 String requestline = readLine (is); |
469 String requestline = readLine(is); |
440 MessageHeader mhead = new MessageHeader (is); |
470 MessageHeader mhead = new MessageHeader(is); |
441 String clen = mhead.findValue ("Content-Length"); |
471 String clen = mhead.findValue("Content-Length"); |
442 String trferenc = mhead.findValue ("Transfer-Encoding"); |
472 String trferenc = mhead.findValue("Transfer-Encoding"); |
443 String data = null; |
473 String data = null; |
444 if (trferenc != null && trferenc.equals ("chunked")) |
474 if (trferenc != null && trferenc.equals("chunked")) |
445 data = new String (readChunkedData (is)); |
475 data = new String(readChunkedData(is)); |
446 else if (clen != null) |
476 else if (clen != null) |
447 data = new String (readNormalData (is, Integer.parseInt (clen))); |
477 data = new String(readNormalData(is, Integer.parseInt(clen))); |
448 String[] req = requestline.split (" "); |
478 String[] req = requestline.split(" "); |
449 if (req.length < 2) { |
479 if (req.length < 2) { |
450 /* invalid request line */ |
480 /* invalid request line */ |
451 return false; |
481 return false; |
452 } |
482 } |
453 String cmd = req[0]; |
483 String cmd = req[0]; |
454 URI uri = null; |
484 URI uri = null; |
455 try { |
485 try { |
456 uri = new URI (req[1]); |
486 uri = new URI(req[1]); |
457 msg = new HttpTransaction (this, cmd, uri, mhead, data, null, chan); |
487 msg = new HttpTransaction(this, cmd, uri, mhead, data, null, chan); |
458 cb.request (msg); |
488 cb.request(msg); |
459 } catch (URISyntaxException e) { |
489 } catch (URISyntaxException e) { |
460 System.err.println ("Invalid URI: " + e); |
490 System.err.println ("Invalid URI: " + e); |
461 msg = new HttpTransaction (this, cmd, null, null, null, null, chan); |
491 msg = new HttpTransaction(this, cmd, null, null, null, null, chan); |
462 msg.sendResponse (501, "Whatever"); |
492 msg.sendResponse(501, "Whatever"); |
463 } |
493 } |
464 res = false; |
494 res = false; |
465 } catch (IOException e) { |
495 } catch (IOException e) { |
466 res = true; |
496 res = true; |
467 } |
497 } |
468 return res; |
498 return res; |
469 } |
499 } |
470 |
500 |
471 byte[] readNormalData (InputStream is, int len) throws IOException { |
501 byte[] readNormalData(InputStream is, int len) throws IOException { |
472 byte [] buf = new byte [len]; |
502 byte[] buf = new byte[len]; |
473 int c, off=0, remain=len; |
503 int c, off=0, remain=len; |
474 while (remain > 0 && ((c=is.read (buf, off, remain))>0)) { |
504 while (remain > 0 && ((c=is.read (buf, off, remain))>0)) { |
475 remain -= c; |
505 remain -= c; |
476 off += c; |
506 off += c; |
477 } |
507 } |
487 throw new IOException( |
517 throw new IOException( |
488 "Expected <CR><LF>: got '" + cr + "/" + lf + "'"); |
518 "Expected <CR><LF>: got '" + cr + "/" + lf + "'"); |
489 } |
519 } |
490 } |
520 } |
491 |
521 |
492 byte[] readChunkedData (InputStream is) throws IOException { |
522 byte[] readChunkedData(InputStream is) throws IOException { |
493 LinkedList l = new LinkedList (); |
523 LinkedList l = new LinkedList(); |
494 int total = 0; |
524 int total = 0; |
495 for (int len=readChunkLen(is); len!=0; len=readChunkLen(is)) { |
525 for (int len=readChunkLen(is); len!=0; len=readChunkLen(is)) { |
496 l.add (readNormalData(is, len)); |
526 l.add(readNormalData(is, len)); |
497 total += len; |
527 total += len; |
498 readCRLF(is); // CRLF at end of chunk |
528 readCRLF(is); // CRLF at end of chunk |
499 } |
529 } |
500 readCRLF(is); // CRLF at end of Chunked Stream. |
530 readCRLF(is); // CRLF at end of Chunked Stream. |
501 byte[] buf = new byte [total]; |
531 byte[] buf = new byte[total]; |
502 Iterator i = l.iterator(); |
532 Iterator i = l.iterator(); |
503 int x = 0; |
533 int x = 0; |
504 while (i.hasNext()) { |
534 while (i.hasNext()) { |
505 byte[] b = (byte[])i.next(); |
535 byte[] b = (byte[])i.next(); |
506 System.arraycopy (b, 0, buf, x, b.length); |
536 System.arraycopy(b, 0, buf, x, b.length); |
507 x += b.length; |
537 x += b.length; |
508 } |
538 } |
509 return buf; |
539 return buf; |
510 } |
540 } |
511 |
541 |
512 private int readChunkLen (InputStream is) throws IOException { |
542 private int readChunkLen(InputStream is) throws IOException { |
513 int c, len=0; |
543 int c, len=0; |
514 boolean done=false, readCR=false; |
544 boolean done=false, readCR=false; |
515 while (!done) { |
545 while (!done) { |
516 c = is.read (); |
546 c = is.read(); |
517 if (c == '\n' && readCR) { |
547 if (c == '\n' && readCR) { |
518 done = true; |
548 done = true; |
519 } else { |
549 } else { |
520 if (c == '\r' && !readCR) { |
550 if (c == '\r' && !readCR) { |
521 readCR = true; |
551 readCR = true; |
533 } |
563 } |
534 } |
564 } |
535 return len; |
565 return len; |
536 } |
566 } |
537 |
567 |
538 private String readLine (InputStream is) throws IOException { |
568 private String readLine(InputStream is) throws IOException { |
539 boolean done=false, readCR=false; |
569 boolean done=false, readCR=false; |
540 byte[] b = new byte [512]; |
570 byte[] b = new byte[512]; |
541 int c, l = 0; |
571 int c, l = 0; |
542 |
572 |
543 while (!done) { |
573 while (!done) { |
544 c = is.read (); |
574 c = is.read(); |
545 if (c == '\n' && readCR) { |
575 if (c == '\n' && readCR) { |
546 done = true; |
576 done = true; |
547 } else { |
577 } else { |
548 if (c == '\r' && !readCR) { |
578 if (c == '\r' && !readCR) { |
549 readCR = true; |
579 readCR = true; |
550 } else { |
580 } else { |
551 b[l++] = (byte)c; |
581 b[l++] = (byte)c; |
552 } |
582 } |
553 } |
583 } |
554 } |
584 } |
555 return new String (b); |
585 return new String(b); |
556 } |
586 } |
557 |
587 |
558 /** close the channel associated with the current key by: |
588 /** close the channel associated with the current key by: |
559 * 1. shutdownOutput (send a FIN) |
589 * 1. shutdownOutput (send a FIN) |
560 * 2. mark the key so that incoming data is to be consumed and discarded |
590 * 2. mark the key so that incoming data is to be consumed and discarded |
561 * 3. After a period, close the socket |
591 * 3. After a period, close the socket |
562 */ |
592 */ |
563 |
593 |
564 synchronized void orderlyCloseChannel (SocketChannel ch) throws IOException { |
594 synchronized void orderlyCloseChannel(SocketChannel ch) throws IOException { |
565 ch.socket().shutdownOutput(); |
595 ch.socket().shutdownOutput(); |
566 } |
596 } |
567 |
597 |
568 synchronized void abortiveCloseChannel (SocketChannel ch) throws IOException { |
598 synchronized void abortiveCloseChannel(SocketChannel ch) throws IOException { |
569 Socket s = ch.socket (); |
599 Socket s = ch.socket(); |
570 s.setSoLinger (true, 0); |
600 s.setSoLinger(true, 0); |
571 ch.close(); |
601 ch.close(); |
572 } |
602 } |
573 } |
603 } |
574 |
604 |
575 |
605 |
590 ByteBuffer markBuf; /* reads may be satisifed from this buffer */ |
620 ByteBuffer markBuf; /* reads may be satisifed from this buffer */ |
591 boolean marked; |
621 boolean marked; |
592 boolean reset; |
622 boolean reset; |
593 int readlimit; |
623 int readlimit; |
594 |
624 |
595 public NioInputStream (SocketChannel chan, SSLEngine sslEng, ByteBuffer inNetBB, ByteBuffer inAppBB) throws IOException { |
625 public NioInputStream(SocketChannel chan, SSLEngine sslEng, ByteBuffer inNetBB, ByteBuffer inAppBB) throws IOException { |
596 this.sslEng = sslEng; |
626 this.sslEng = sslEng; |
597 this.channel = chan; |
627 this.channel = chan; |
598 selector = Selector.open(); |
628 selector = Selector.open(); |
599 this.inNetBB = inNetBB; |
629 this.inNetBB = inNetBB; |
600 this.inAppBB = inAppBB; |
630 this.inAppBB = inAppBB; |
601 key = chan.register (selector, SelectionKey.OP_READ); |
631 key = chan.register(selector, SelectionKey.OP_READ); |
602 available = 0; |
632 available = 0; |
603 one = new byte[1]; |
633 one = new byte[1]; |
604 closed = marked = reset = false; |
634 closed = marked = reset = false; |
605 } |
635 } |
606 |
636 |
607 public synchronized int read (byte[] b) throws IOException { |
637 public synchronized int read(byte[] b) throws IOException { |
608 return read (b, 0, b.length); |
638 return read(b, 0, b.length); |
609 } |
639 } |
610 |
640 |
611 public synchronized int read () throws IOException { |
641 public synchronized int read() throws IOException { |
612 return read (one, 0, 1); |
642 return read(one, 0, 1); |
613 } |
643 } |
614 |
644 |
615 public synchronized int read (byte[] b, int off, int srclen) throws IOException { |
645 public synchronized int read(byte[] b, int off, int srclen) throws IOException { |
616 |
646 |
617 int canreturn, willreturn; |
647 int canreturn, willreturn; |
618 |
648 |
619 if (closed) |
649 if (closed) |
620 return -1; |
650 return -1; |
621 |
651 |
622 if (reset) { /* satisfy from markBuf */ |
652 if (reset) { /* satisfy from markBuf */ |
623 canreturn = markBuf.remaining (); |
653 canreturn = markBuf.remaining(); |
624 willreturn = canreturn>srclen ? srclen : canreturn; |
654 willreturn = canreturn > srclen ? srclen : canreturn; |
625 markBuf.get(b, off, willreturn); |
655 markBuf.get(b, off, willreturn); |
626 if (canreturn == willreturn) { |
656 if (canreturn == willreturn) { |
627 reset = false; |
657 reset = false; |
628 } |
658 } |
629 } else { /* satisfy from channel */ |
659 } else { /* satisfy from channel */ |
630 canreturn = available(); |
660 canreturn = available(); |
631 if (canreturn == 0) { |
661 if (canreturn == 0) { |
632 block (); |
662 block(); |
633 canreturn = available(); |
663 canreturn = available(); |
634 } |
664 } |
635 willreturn = canreturn>srclen ? srclen : canreturn; |
665 willreturn = canreturn > srclen ? srclen : canreturn; |
636 inAppBB.get(b, off, willreturn); |
666 inAppBB.get(b, off, willreturn); |
637 available -= willreturn; |
667 available -= willreturn; |
638 |
668 |
639 if (marked) { /* copy into markBuf */ |
669 if (marked) { /* copy into markBuf */ |
640 try { |
670 try { |
641 markBuf.put (b, off, willreturn); |
671 markBuf.put(b, off, willreturn); |
642 } catch (BufferOverflowException e) { |
672 } catch (BufferOverflowException e) { |
643 marked = false; |
673 marked = false; |
644 } |
674 } |
645 } |
675 } |
646 } |
676 } |
647 return willreturn; |
677 return willreturn; |
648 } |
678 } |
649 |
679 |
650 public synchronized int available () throws IOException { |
680 public synchronized int available() throws IOException { |
651 if (closed) |
681 if (closed) |
652 throw new IOException ("Stream is closed"); |
682 throw new IOException("Stream is closed"); |
653 |
683 |
654 if (reset) |
684 if (reset) |
655 return markBuf.remaining(); |
685 return markBuf.remaining(); |
656 |
686 |
657 if (available > 0) |
687 if (available > 0) |
658 return available; |
688 return available; |
659 |
689 |
660 inAppBB.clear (); |
690 inAppBB.clear(); |
661 int bytes = channel.read (inNetBB); |
691 int bytes = channel.read(inNetBB); |
662 |
692 |
663 int needed = sslEng.getSession().getApplicationBufferSize(); |
693 int needed = sslEng.getSession().getApplicationBufferSize(); |
664 if (needed > inAppBB.remaining()) { |
694 if (needed > inAppBB.remaining()) { |
665 inAppBB = ByteBuffer.allocate(needed); |
695 inAppBB = ByteBuffer.allocate(needed); |
666 } |
696 } |
670 available = result.bytesProduced(); |
700 available = result.bytesProduced(); |
671 |
701 |
672 if (available > 0) |
702 if (available > 0) |
673 inAppBB.flip(); |
703 inAppBB.flip(); |
674 else if (available == -1) |
704 else if (available == -1) |
675 throw new IOException ("Stream is closed"); |
705 throw new IOException("Stream is closed"); |
676 return available; |
706 return available; |
677 } |
707 } |
678 |
708 |
679 /** |
709 /** |
680 * block() only called when available==0 and buf is empty |
710 * block() only called when available==0 and buf is empty |
681 */ |
711 */ |
682 private synchronized void block () throws IOException { |
712 private synchronized void block() throws IOException { |
683 //assert available == 0; |
713 //assert available == 0; |
684 int n = selector.select (); |
714 int n = selector.select(); |
685 //assert n == 1; |
715 //assert n == 1; |
686 selector.selectedKeys().clear(); |
716 selector.selectedKeys().clear(); |
687 available (); |
717 available(); |
688 } |
718 } |
689 |
719 |
690 public void close () throws IOException { |
720 public void close() throws IOException { |
691 if (closed) |
721 if (closed) |
692 return; |
722 return; |
693 channel.close (); |
723 channel.close(); |
694 closed = true; |
724 closed = true; |
695 } |
725 } |
696 |
726 |
697 public synchronized void mark (int readlimit) { |
727 public synchronized void mark(int readlimit) { |
698 if (closed) |
728 if (closed) |
699 return; |
729 return; |
700 this.readlimit = readlimit; |
730 this.readlimit = readlimit; |
701 markBuf = ByteBuffer.allocate (readlimit); |
731 markBuf = ByteBuffer.allocate(readlimit); |
702 marked = true; |
732 marked = true; |
703 reset = false; |
733 reset = false; |
704 } |
734 } |
705 |
735 |
706 public synchronized void reset () throws IOException { |
736 public synchronized void reset() throws IOException { |
707 if (closed ) |
737 if (closed ) |
708 return; |
738 return; |
709 if (!marked) |
739 if (!marked) |
710 throw new IOException ("Stream not marked"); |
740 throw new IOException("Stream not marked"); |
711 marked = false; |
741 marked = false; |
712 reset = true; |
742 reset = true; |
713 markBuf.flip (); |
743 markBuf.flip(); |
714 } |
744 } |
715 } |
745 } |
716 |
746 |
717 static class NioOutputStream extends OutputStream { |
747 static class NioOutputStream extends OutputStream { |
718 SSLEngine sslEng; |
748 SSLEngine sslEng; |
722 SelectionKey key; |
752 SelectionKey key; |
723 Selector selector; |
753 Selector selector; |
724 boolean closed; |
754 boolean closed; |
725 byte[] one; |
755 byte[] one; |
726 |
756 |
727 public NioOutputStream (SocketChannel channel, SSLEngine sslEng, ByteBuffer outNetBB, ByteBuffer outAppBB) throws IOException { |
757 public NioOutputStream(SocketChannel channel, SSLEngine sslEng, ByteBuffer outNetBB, ByteBuffer outAppBB) throws IOException { |
728 this.sslEng = sslEng; |
758 this.sslEng = sslEng; |
729 this.channel = channel; |
759 this.channel = channel; |
730 this.outNetBB = outNetBB; |
760 this.outNetBB = outNetBB; |
731 this.outAppBB = outAppBB; |
761 this.outAppBB = outAppBB; |
732 selector = Selector.open (); |
762 selector = Selector.open(); |
733 key = channel.register (selector, SelectionKey.OP_WRITE); |
763 key = channel.register(selector, SelectionKey.OP_WRITE); |
734 closed = false; |
764 closed = false; |
735 one = new byte [1]; |
765 one = new byte[1]; |
736 } |
766 } |
737 |
767 |
738 public synchronized void write (int b) throws IOException { |
768 public synchronized void write(int b) throws IOException { |
739 one[0] = (byte)b; |
769 one[0] = (byte)b; |
740 write (one, 0, 1); |
770 write(one, 0, 1); |
741 } |
771 } |
742 |
772 |
743 public synchronized void write (byte[] b) throws IOException { |
773 public synchronized void write(byte[] b) throws IOException { |
744 write (b, 0, b.length); |
774 write(b, 0, b.length); |
745 } |
775 } |
746 |
776 |
747 public synchronized void write (byte[] b, int off, int len) throws IOException { |
777 public synchronized void write(byte[] b, int off, int len) throws IOException { |
748 if (closed) |
778 if (closed) |
749 throw new IOException ("stream is closed"); |
779 throw new IOException("stream is closed"); |
750 |
780 |
751 outAppBB = ByteBuffer.allocate (len); |
781 outAppBB = ByteBuffer.allocate(len); |
752 outAppBB.put (b, off, len); |
782 outAppBB.put(b, off, len); |
753 outAppBB.flip (); |
783 outAppBB.flip(); |
754 int n; |
784 int n; |
755 outNetBB.clear(); |
785 outNetBB.clear(); |
756 int needed = sslEng.getSession().getPacketBufferSize(); |
786 int needed = sslEng.getSession().getPacketBufferSize(); |
757 if (outNetBB.capacity() < needed) { |
787 if (outNetBB.capacity() < needed) { |
758 outNetBB = ByteBuffer.allocate(needed); |
788 outNetBB = ByteBuffer.allocate(needed); |
870 * <P> |
900 * <P> |
871 * Obviously, if fewer than N threads make the rendezvous then the result |
901 * Obviously, if fewer than N threads make the rendezvous then the result |
872 * will be a hang. |
902 * will be a hang. |
873 */ |
903 */ |
874 |
904 |
875 public static void rendezvous (String condition, int N) { |
905 public static void rendezvous(String condition, int N) { |
876 BValue cond; |
906 BValue cond; |
877 IValue iv; |
907 IValue iv; |
878 String name = "RV_"+condition; |
908 String name = "RV_"+condition; |
879 |
909 |
880 /* get the condition */ |
910 /* get the condition */ |
881 |
911 |
882 synchronized (conditions) { |
912 synchronized (conditions) { |
883 cond = (BValue)conditions.get (name); |
913 cond = (BValue)conditions.get(name); |
884 if (cond == null) { |
914 if (cond == null) { |
885 /* we are first caller */ |
915 /* we are first caller */ |
886 if (N < 2) { |
916 if (N < 2) { |
887 throw new RuntimeException ("rendezvous must be called with N >= 2"); |
917 throw new RuntimeException("rendezvous must be called with N >= 2"); |
888 } |
918 } |
889 cond = new BValue (); |
919 cond = new BValue(); |
890 conditions.put (name, cond); |
920 conditions.put(name, cond); |
891 iv = new IValue (N-1); |
921 iv = new IValue(N-1); |
892 rv.put (name, iv); |
922 rv.put(name, iv); |
893 } else { |
923 } else { |
894 /* already initialised, just decrement the counter */ |
924 /* already initialised, just decrement the counter */ |
895 iv = (IValue) rv.get (name); |
925 iv = (IValue) rv.get(name); |
896 iv.v --; |
926 iv.v--; |
897 } |
927 } |
898 } |
928 } |
899 |
929 |
900 if (iv.v > 0) { |
930 if (iv.v > 0) { |
901 waitForCondition (name); |
931 waitForCondition(name); |
902 } else { |
932 } else { |
903 setCondition (name); |
933 setCondition(name); |
904 synchronized (conditions) { |
934 synchronized (conditions) { |
905 clearCondition (name); |
935 clearCondition(name); |
906 rv.remove (name); |
936 rv.remove(name); |
907 } |
937 } |
908 } |
938 } |
909 } |
939 } |
910 |
940 |
911 /** |
941 /** |