test/jdk/java/net/httpclient/http2/server/Queue.java
branchhttp-client-branch
changeset 55852 32f6aefec11e
parent 55819 18e431209168
child 55892 9f345a976249
equal deleted inserted replaced
55851:0bd10b7df2d2 55852:32f6aefec11e
    33 // for output. Can be used blocking or asynchronously.
    33 // for output. Can be used blocking or asynchronously.
    34 
    34 
    35 public class Queue<T> implements ExceptionallyCloseable {
    35 public class Queue<T> implements ExceptionallyCloseable {
    36 
    36 
    37     private final LinkedList<T> q = new LinkedList<>();
    37     private final LinkedList<T> q = new LinkedList<>();
    38     private volatile boolean closed = false;
    38     private boolean closed = false;
    39     private volatile Throwable exception = null;
    39     private boolean closing = false;
       
    40     private Throwable exception = null;
    40     private Runnable callback;
    41     private Runnable callback;
    41     private boolean callbackDisabled = false;
    42     private boolean callbackDisabled = false;
    42     private int waiters; // true if someone waiting
    43     private int waiters; // true if someone waiting
       
    44     private final T closeSentinel;
       
    45 
       
    46     Queue(T closeSentinel) {
       
    47         this.closeSentinel = closeSentinel;
       
    48     }
    43 
    49 
    44     public synchronized int size() {
    50     public synchronized int size() {
    45         return q.size();
    51         return q.size();
    46     }
    52     }
    47 
    53 
    48 //    public synchronized boolean tryPut(T obj) throws IOException {
       
    49 //        if (closed) return false;
       
    50 //        put(obj);
       
    51 //        return true;
       
    52 //    }
       
    53 
       
    54     public synchronized void put(T obj) throws IOException {
    54     public synchronized void put(T obj) throws IOException {
    55         if (closed) {
    55         if (closed || closing) {
    56             throw new IOException("stream closed");
    56             throw new IOException("stream closed");
    57         }
    57         }
    58 
    58 
    59         q.add(obj);
    59         q.add(obj);
    60 
    60 
    71             // dangerous and may lead to deadlocks.
    71             // dangerous and may lead to deadlocks.
    72             callback.run();
    72             callback.run();
    73         }
    73         }
    74     }
    74     }
    75 
    75 
    76 //    public synchronized void disableCallback() {
    76     // Other close() variants are immediate and abortive
    77 //        callbackDisabled = true;
    77     // This allows whatever is on Q to be processed first.
    78 //    }
       
    79 
    78 
    80 //    public synchronized void enableCallback() {
    79     public synchronized void orderlyClose() {
    81 //        callbackDisabled = false;
    80         if (closing || closed)
    82 //        while (q.size() > 0) {
    81             return;
    83 //            callback.run();
    82         try {
    84 //        }
    83             put(closeSentinel);
    85 //    }
    84         } catch (IOException e) {
    86 
    85             e.printStackTrace();
    87 //    /**
    86         }
    88 //     * callback is invoked any time put is called where
    87         closing = true;
    89 //     * the Queue was empty.
    88     }
    90 //     */
       
    91 //    public synchronized void registerPutCallback(Runnable callback) {
       
    92 //        Objects.requireNonNull(callback);
       
    93 //        this.callback = callback;
       
    94 //        if (q.size() > 0) {
       
    95 //            // Note: calling callback while holding the lock is
       
    96 //            // dangerous and may lead to deadlocks.
       
    97 //            callback.run();
       
    98 //        }
       
    99 //    }
       
   100 
    89 
   101     @Override
    90     @Override
   102     public synchronized void close() {
    91     public synchronized void close() {
   103         closed = true;
    92         closed = true;
   104         notifyAll();
    93         notifyAll();
   130                 if (closed) {
   119                 if (closed) {
   131                     throw newIOException("Queue closed");
   120                     throw newIOException("Queue closed");
   132                 }
   121                 }
   133                 waiters--;
   122                 waiters--;
   134             }
   123             }
   135             return q.removeFirst();
   124             T item = q.removeFirst();
       
   125             if (item.equals(closeSentinel)) {
       
   126                 closed = true;
       
   127                 assert q.isEmpty();
       
   128             }
       
   129             return item;
   136         } catch (InterruptedException ex) {
   130         } catch (InterruptedException ex) {
   137             throw new IOException(ex);
   131             throw new IOException(ex);
   138         }
   132         }
   139     }
   133     }
   140 
   134 
   144         }
   138         }
   145 
   139 
   146         if (q.isEmpty()) {
   140         if (q.isEmpty()) {
   147             return null;
   141             return null;
   148         }
   142         }
   149         T res = q.removeFirst();
   143         return take();
   150         return res;
       
   151     }
   144     }
   152 
       
   153 //    public synchronized T[] pollAll(T[] type) throws IOException {
       
   154 //        T[] ret = q.toArray(type);
       
   155 //        q.clear();
       
   156 //        return ret;
       
   157 //    }
       
   158 
       
   159 //    public synchronized void pushback(T v) {
       
   160 //        q.addFirst(v);
       
   161 //    }
       
   162 
       
   163 //    public synchronized void pushbackAll(T[] v) {
       
   164 //        for (int i=v.length-1; i>=0; i--) {
       
   165 //            q.addFirst(v[i]);
       
   166 //        }
       
   167 //    }
       
   168 
   145 
   169     private IOException newIOException(String msg) {
   146     private IOException newIOException(String msg) {
   170         if (exception == null) {
   147         if (exception == null) {
   171             return new IOException(msg);
   148             return new IOException(msg);
   172         } else {
   149         } else {
   173             return new IOException(msg, exception);
   150             return new IOException(msg, exception);
   174         }
   151         }
   175     }
   152     }
   176 
       
   177 }
   153 }