8163503: PKCS12 keystore cannot store non-X.509 certificates
Reviewed-by: weijun, xuelei, mullan
/*
* Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.net.http;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import static javax.net.ssl.SSLEngineResult.Status.*;
import javax.net.ssl.*;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*;
/**
* Asynchronous wrapper around SSLEngine. send and receive is fully non
* blocking. When handshaking is required, a thread is created to perform
* the handshake and application level sends do not take place during this time.
*
* Is implemented using queues and functions operating on the receiving end
* of each queue.
*
* Application writes to:
* ||
* \/
* appOutputQ
* ||
* \/
* appOutputQ read by "upperWrite" method which does SSLEngine.wrap
* and writes to
* ||
* \/
* channelOutputQ
* ||
* \/
* channelOutputQ is read by "lowerWrite" method which is invoked from
* OP_WRITE events on the socket (from selector thread)
*
* Reading side is as follows
* --------------------------
*
* "upperRead" method reads off channelInputQ and calls SSLEngine.unwrap and
* when decrypted data is returned, it is passed to the user's Consumer<ByteBuffer>
* /\
* ||
* channelInputQ
* /\
* ||
* "lowerRead" method puts buffers into channelInputQ. It is invoked from
* OP_READ events from the selector.
*
* Whenever handshaking is required, the doHandshaking() method is called
* which creates a thread to complete the handshake. It takes over the
* channelInputQ from upperRead, and puts outgoing packets on channelOutputQ.
* Selector events are delivered to lowerRead and lowerWrite as normal.
*
* Errors
*
* Any exception thrown by the engine or channel, causes all Queues to be closed
* the channel to be closed, and the error is reported to the user's
* Consumer<Throwable>
*/
class AsyncSSLDelegate implements Closeable, AsyncConnection {
// outgoing buffers put in this queue first and may remain here
// while SSL handshaking happening.
final Queue<ByteBuffer> appOutputQ;
// queue of wrapped ByteBuffers waiting to be sent on socket channel
//final Queue<ByteBuffer> channelOutputQ;
// Bytes read into this queue before being unwrapped. Backup on this
// Q should only happen when the engine is stalled due to delegated tasks
final Queue<ByteBuffer> channelInputQ;
// input occurs through the read() method which is expected to be called
// when the selector signals some data is waiting to be read. All incoming
// handshake data is handled in this method, which means some calls to
// read() may return zero bytes of user data. This is not a sign of spinning,
// just that the handshake mechanics are being executed.
final SSLEngine engine;
final SSLParameters sslParameters;
//final SocketChannel chan;
final HttpConnection lowerOutput;
final HttpClientImpl client;
final ExecutorService executor;
final BufferHandler bufPool;
Consumer<ByteBuffer> receiver;
Consumer<Throwable> errorHandler;
// Locks.
final Object reader = new Object();
final Object writer = new Object();
// synchronizing handshake state
final Object handshaker = new Object();
// flag set when reader or writer is blocked waiting for handshake to finish
boolean writerBlocked;
boolean readerBlocked;
// some thread is currently doing the handshake
boolean handshaking;
// alpn[] may be null. upcall is callback which receives incoming decoded bytes off socket
AsyncSSLDelegate(HttpConnection lowerOutput, HttpClientImpl client, String[] alpn)
{
SSLContext context = client.sslContext();
executor = client.executorService();
bufPool = client;
appOutputQ = new Queue<>();
appOutputQ.registerPutCallback(this::upperWrite);
//channelOutputQ = new Queue<>();
//channelOutputQ.registerPutCallback(this::lowerWrite);
engine = context.createSSLEngine();
engine.setUseClientMode(true);
SSLParameters sslp = client.sslParameters().orElse(null);
if (sslp == null) {
sslp = context.getSupportedSSLParameters();
//sslp = context.getDefaultSSLParameters();
//printParams(sslp);
}
sslParameters = Utils.copySSLParameters(sslp);
if (alpn != null) {
sslParameters.setApplicationProtocols(alpn);
}
logParams(sslParameters);
engine.setSSLParameters(sslParameters);
this.lowerOutput = lowerOutput;
this.client = client;
this.channelInputQ = new Queue<>();
this.channelInputQ.registerPutCallback(this::upperRead);
}
/**
* Put buffers to appOutputQ, and call upperWrite() if q was empty.
*
* @param src
*/
public void write(ByteBuffer[] src) throws IOException {
appOutputQ.putAll(src);
}
public void write(ByteBuffer buf) throws IOException {
ByteBuffer[] a = new ByteBuffer[1];
a[0] = buf;
write(a);
}
@Override
public void close() {
Utils.close(appOutputQ, channelInputQ, lowerOutput);
}
/**
* Attempts to wrap buffers from appOutputQ and place them on the
* channelOutputQ for writing. If handshaking is happening, then the
* process stalls and last buffers taken off the appOutputQ are put back
* into it until handshaking completes.
*
* This same method is called to try and resume output after a blocking
* handshaking operation has completed.
*/
private void upperWrite() {
try {
EngineResult r = null;
ByteBuffer[] buffers = appOutputQ.pollAll(Utils.EMPTY_BB_ARRAY);
int bytes = Utils.remaining(buffers);
while (bytes > 0) {
synchronized (writer) {
r = wrapBuffers(buffers);
int bytesProduced = r.bytesProduced();
int bytesConsumed = r.bytesConsumed();
bytes -= bytesConsumed;
if (bytesProduced > 0) {
// pass destination buffer to channelOutputQ.
lowerOutput.write(r.destBuffer);
}
synchronized (handshaker) {
if (r.handshaking()) {
// handshaking is happening or is needed
// so we put the buffers back on Q to process again
// later. It's possible that some may have already
// been processed, which is ok.
appOutputQ.pushbackAll(buffers);
writerBlocked = true;
if (!handshaking()) {
// execute the handshake in another thread.
// This method will be called again to resume sending
// later
doHandshake(r);
}
return;
}
}
}
}
returnBuffers(buffers);
} catch (Throwable t) {
close();
errorHandler.accept(t);
}
}
private void doHandshake(EngineResult r) {
handshaking = true;
channelInputQ.registerPutCallback(null);
executor.execute(() -> {
try {
doHandshakeImpl(r);
channelInputQ.registerPutCallback(this::upperRead);
} catch (Throwable t) {
close();
errorHandler.accept(t);
}
});
}
private void returnBuffers(ByteBuffer[] bufs) {
for (ByteBuffer buf : bufs)
client.returnBuffer(buf);
}
/**
* Return true if some thread is currently doing the handshake
*
* @return
*/
boolean handshaking() {
synchronized(handshaker) {
return handshaking;
}
}
/**
* Executes entire handshake in calling thread.
* Returns after handshake is completed or error occurs
* @param r
* @throws IOException
*/
private void doHandshakeImpl(EngineResult r) throws IOException {
while (true) {
SSLEngineResult.HandshakeStatus status = r.handshakeStatus();
if (status == NEED_TASK) {
LinkedList<Runnable> tasks = obtainTasks();
for (Runnable task : tasks)
task.run();
r = handshakeWrapAndSend();
} else if (status == NEED_WRAP) {
r = handshakeWrapAndSend();
} else if (status == NEED_UNWRAP) {
r = handshakeReceiveAndUnWrap();
}
if (!r.handshaking())
break;
}
boolean dowrite = false;
boolean doread = false;
// Handshake is finished. Now resume reading and/or writing
synchronized(handshaker) {
handshaking = false;
if (writerBlocked) {
writerBlocked = false;
dowrite = true;
}
if (readerBlocked) {
readerBlocked = false;
doread = true;
}
}
if (dowrite)
upperWrite();
if (doread)
upperRead();
}
// acknowledge a received CLOSE request from peer
void doClosure() throws IOException {
//while (!wrapAndSend(emptyArray))
//;
}
LinkedList<Runnable> obtainTasks() {
LinkedList<Runnable> l = new LinkedList<>();
Runnable r;
while ((r = engine.getDelegatedTask()) != null)
l.add(r);
return l;
}
@Override
public synchronized void setAsyncCallbacks(Consumer<ByteBuffer> asyncReceiver, Consumer<Throwable> errorReceiver) {
this.receiver = asyncReceiver;
this.errorHandler = errorReceiver;
}
@Override
public void startReading() {
// maybe this class does not need to implement AsyncConnection
}
static class EngineResult {
ByteBuffer destBuffer;
ByteBuffer srcBuffer;
SSLEngineResult result;
Throwable t;
boolean handshaking() {
SSLEngineResult.HandshakeStatus s = result.getHandshakeStatus();
return s != FINISHED && s != NOT_HANDSHAKING;
}
int bytesConsumed() {
return result.bytesConsumed();
}
int bytesProduced() {
return result.bytesProduced();
}
Throwable exception() {
return t;
}
SSLEngineResult.HandshakeStatus handshakeStatus() {
return result.getHandshakeStatus();
}
SSLEngineResult.Status status() {
return result.getStatus();
}
}
EngineResult handshakeWrapAndSend() throws IOException {
EngineResult r = wrapBuffer(Utils.EMPTY_BYTEBUFFER);
if (r.bytesProduced() > 0) {
lowerOutput.write(r.destBuffer);
}
return r;
}
// called during handshaking. It blocks until a complete packet
// is available, unwraps it and returns.
EngineResult handshakeReceiveAndUnWrap() throws IOException {
ByteBuffer buf = channelInputQ.take();
while (true) {
// block waiting for input
EngineResult r = unwrapBuffer(buf);
SSLEngineResult.Status status = r.status();
if (status == BUFFER_UNDERFLOW) {
// wait for another buffer to arrive
ByteBuffer buf1 = channelInputQ.take();
buf = combine (buf, buf1);
continue;
}
// OK
// theoretically possible we could receive some user data
if (r.bytesProduced() > 0) {
receiver.accept(r.destBuffer);
}
if (!buf.hasRemaining())
return r;
}
}
EngineResult wrapBuffer(ByteBuffer src) throws SSLException {
ByteBuffer[] bufs = new ByteBuffer[1];
bufs[0] = src;
return wrapBuffers(bufs);
}
EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
EngineResult r = new EngineResult();
ByteBuffer dst = bufPool.getBuffer();
while (true) {
r.result = engine.wrap(src, dst);
switch (r.result.getStatus()) {
case BUFFER_OVERFLOW:
dst = getPacketBuffer();
break;
case CLOSED:
case OK:
dst.flip();
r.destBuffer = dst;
return r;
case BUFFER_UNDERFLOW:
// underflow handled externally
bufPool.returnBuffer(dst);
return r;
default:
assert false;
}
}
}
EngineResult unwrapBuffer(ByteBuffer srcbuf) throws IOException {
EngineResult r = new EngineResult();
r.srcBuffer = srcbuf;
ByteBuffer dst = bufPool.getBuffer();
while (true) {
r.result = engine.unwrap(srcbuf, dst);
switch (r.result.getStatus()) {
case BUFFER_OVERFLOW:
// dest buffer not big enough. Reallocate
int oldcap = dst.capacity();
dst = getApplicationBuffer();
assert dst.capacity() > oldcap;
break;
case CLOSED:
doClosure();
throw new IOException("Engine closed");
case BUFFER_UNDERFLOW:
bufPool.returnBuffer(dst);
return r;
case OK:
dst.flip();
r.destBuffer = dst;
return r;
}
}
}
/**
* Asynchronous read input. Call this when selector fires.
* Unwrap done in upperRead because it also happens in
* doHandshake() when handshake taking place
*/
public void lowerRead(ByteBuffer buffer) {
try {
channelInputQ.put(buffer);
} catch (Throwable t) {
close();
errorHandler.accept(t);
}
}
public void upperRead() {
EngineResult r;
ByteBuffer srcbuf;
synchronized (reader) {
try {
srcbuf = channelInputQ.poll();
if (srcbuf == null) {
return;
}
while (true) {
r = unwrapBuffer(srcbuf);
switch (r.result.getStatus()) {
case BUFFER_UNDERFLOW:
// Buffer too small. Need to combine with next buf
ByteBuffer nextBuf = channelInputQ.poll();
if (nextBuf == null) {
// no data available. push buffer back until more data available
channelInputQ.pushback(srcbuf);
return;
} else {
srcbuf = combine(srcbuf, nextBuf);
}
break;
case OK:
// check for any handshaking work
synchronized (handshaker) {
if (r.handshaking()) {
// handshaking is happening or is needed
// so we put the buffer back on Q to process again
// later.
channelInputQ.pushback(srcbuf);
readerBlocked = true;
if (!handshaking()) {
// execute the handshake in another thread.
// This method will be called again to resume sending
// later
doHandshake(r);
}
return;
}
}
ByteBuffer dst = r.destBuffer;
if (dst.hasRemaining()) {
receiver.accept(dst);
}
}
if (srcbuf.hasRemaining()) {
continue;
}
srcbuf = channelInputQ.poll();
if (srcbuf == null) {
return;
}
}
} catch (Throwable t) {
close();
errorHandler.accept(t);
}
}
}
/**
* Get a new buffer that is the right size for application buffers.
*
* @return
*/
ByteBuffer getApplicationBuffer() {
SSLSession session = engine.getSession();
int appBufsize = session.getApplicationBufferSize();
bufPool.setMinBufferSize(appBufsize);
return bufPool.getBuffer(appBufsize);
}
ByteBuffer getPacketBuffer() {
SSLSession session = engine.getSession();
int packetBufSize = session.getPacketBufferSize();
bufPool.setMinBufferSize(packetBufSize);
return bufPool.getBuffer(packetBufSize);
}
ByteBuffer combine(ByteBuffer buf1, ByteBuffer buf2) {
int avail1 = buf1.capacity() - buf1.remaining();
if (buf2.remaining() < avail1) {
buf1.compact();
buf1.put(buf2);
buf1.flip();
return buf1;
}
int newsize = buf1.remaining() + buf2.remaining();
ByteBuffer newbuf = bufPool.getBuffer(newsize);
newbuf.put(buf1);
newbuf.put(buf2);
newbuf.flip();
return newbuf;
}
SSLParameters getSSLParameters() {
return sslParameters;
}
static void logParams(SSLParameters p) {
if (!Log.ssl()) {
return;
}
Log.logSSL("SSLParameters:");
if (p == null) {
Log.logSSL("Null params");
return;
}
if (p.getCipherSuites() != null) {
for (String cipher : p.getCipherSuites()) {
Log.logSSL("cipher: {0}\n", cipher);
}
}
// SSLParameters.getApplicationProtocols() can't return null
for (String approto : p.getApplicationProtocols()) {
Log.logSSL("application protocol: {0}\n", approto);
}
if (p.getProtocols() != null) {
for (String protocol : p.getProtocols()) {
Log.logSSL("protocol: {0}\n", protocol);
}
}
if (p.getServerNames() != null) {
for (SNIServerName sname : p.getServerNames()) {
Log.logSSL("server name: {0}\n", sname.toString());
}
}
}
String getSessionInfo() {
StringBuilder sb = new StringBuilder();
String application = engine.getApplicationProtocol();
SSLSession sess = engine.getSession();
String cipher = sess.getCipherSuite();
String protocol = sess.getProtocol();
sb.append("Handshake complete alpn: ")
.append(application)
.append(", Cipher: ")
.append(cipher)
.append(", Protocol: ")
.append(protocol);
return sb.toString();
}
}