84 synchronized (stateLock) { |
84 synchronized (stateLock) { |
85 tryChangeState(State.ERROR); |
85 tryChangeState(State.ERROR); |
86 } |
86 } |
87 } |
87 } |
88 }; |
88 }; |
89 transmitter = new WSTransmitter(executor, channel, errorHandler); |
89 transmitter = new WSTransmitter(this, executor, channel, errorHandler); |
90 receiver = new WSReceiver(this.listener, this, executor, channel); |
90 receiver = new WSReceiver(this.listener, this, executor, channel); |
91 } |
91 } |
92 |
92 |
93 private void start() { |
93 private void start() { |
94 receiver.start(); |
94 receiver.start(); |
95 } |
95 } |
96 |
96 |
97 @Override |
97 @Override |
98 public CompletableFuture<Void> sendText(CharSequence message, boolean isLast) { |
98 public CompletableFuture<WebSocket> sendText(CharSequence message, boolean isLast) { |
99 requireNonNull(message, "message"); |
99 requireNonNull(message, "message"); |
100 synchronized (stateLock) { |
100 synchronized (stateLock) { |
101 checkState(); |
101 checkState(); |
102 return transmitter.sendText(message, isLast); |
102 return transmitter.sendText(message, isLast); |
103 } |
103 } |
104 } |
104 } |
105 |
105 |
106 @Override |
106 @Override |
107 public CompletableFuture<Void> sendText(Stream<? extends CharSequence> message) { |
107 public CompletableFuture<WebSocket> sendText(Stream<? extends CharSequence> message) { |
108 requireNonNull(message, "message"); |
108 requireNonNull(message, "message"); |
109 synchronized (stateLock) { |
109 synchronized (stateLock) { |
110 checkState(); |
110 checkState(); |
111 return transmitter.sendText(message); |
111 return transmitter.sendText(message); |
112 } |
112 } |
113 } |
113 } |
114 |
114 |
115 @Override |
115 @Override |
116 public CompletableFuture<Void> sendBinary(ByteBuffer message, boolean isLast) { |
116 public CompletableFuture<WebSocket> sendBinary(ByteBuffer message, boolean isLast) { |
117 requireNonNull(message, "message"); |
117 requireNonNull(message, "message"); |
118 synchronized (stateLock) { |
118 synchronized (stateLock) { |
119 checkState(); |
119 checkState(); |
120 return transmitter.sendBinary(message, isLast); |
120 return transmitter.sendBinary(message, isLast); |
121 } |
121 } |
122 } |
122 } |
123 |
123 |
124 @Override |
124 @Override |
125 public CompletableFuture<Void> sendPing(ByteBuffer message) { |
125 public CompletableFuture<WebSocket> sendPing(ByteBuffer message) { |
126 requireNonNull(message, "message"); |
126 requireNonNull(message, "message"); |
127 synchronized (stateLock) { |
127 synchronized (stateLock) { |
128 checkState(); |
128 checkState(); |
129 return transmitter.sendPing(message); |
129 return transmitter.sendPing(message); |
130 } |
130 } |
131 } |
131 } |
132 |
132 |
133 @Override |
133 @Override |
134 public CompletableFuture<Void> sendPong(ByteBuffer message) { |
134 public CompletableFuture<WebSocket> sendPong(ByteBuffer message) { |
135 requireNonNull(message, "message"); |
135 requireNonNull(message, "message"); |
136 synchronized (stateLock) { |
136 synchronized (stateLock) { |
137 checkState(); |
137 checkState(); |
138 return transmitter.sendPong(message); |
138 return transmitter.sendPong(message); |
139 } |
139 } |
140 } |
140 } |
141 |
141 |
142 @Override |
142 @Override |
143 public CompletableFuture<Void> sendClose(CloseCode code, CharSequence reason) { |
143 public CompletableFuture<WebSocket> sendClose(CloseCode code, CharSequence reason) { |
144 requireNonNull(code, "code"); |
144 requireNonNull(code, "code"); |
145 requireNonNull(reason, "reason"); |
145 requireNonNull(reason, "reason"); |
146 synchronized (stateLock) { |
146 synchronized (stateLock) { |
147 return doSendClose(() -> transmitter.sendClose(code, reason)); |
147 return doSendClose(() -> transmitter.sendClose(code, reason)); |
148 } |
148 } |
149 } |
149 } |
150 |
150 |
151 @Override |
151 @Override |
152 public CompletableFuture<Void> sendClose() { |
152 public CompletableFuture<WebSocket> sendClose() { |
153 synchronized (stateLock) { |
153 synchronized (stateLock) { |
154 return doSendClose(() -> transmitter.sendClose()); |
154 return doSendClose(() -> transmitter.sendClose()); |
155 } |
155 } |
156 } |
156 } |
157 |
157 |
158 private CompletableFuture<Void> doSendClose(Supplier<CompletableFuture<Void>> s) { |
158 private CompletableFuture<WebSocket> doSendClose(Supplier<CompletableFuture<WebSocket>> s) { |
159 checkState(); |
159 checkState(); |
160 boolean closeChannel = false; |
160 boolean closeChannel = false; |
161 synchronized (stateLock) { |
161 synchronized (stateLock) { |
162 if (state == State.CLOSED_REMOTELY) { |
162 if (state == State.CLOSED_REMOTELY) { |
163 closeChannel = tryChangeState(State.CLOSED); |
163 closeChannel = tryChangeState(State.CLOSED); |
164 } else { |
164 } else { |
165 tryChangeState(State.CLOSED_LOCALLY); |
165 tryChangeState(State.CLOSED_LOCALLY); |
166 } |
166 } |
167 } |
167 } |
168 CompletableFuture<Void> sent = s.get(); |
168 CompletableFuture<WebSocket> sent = s.get(); |
169 if (closeChannel) { |
169 if (closeChannel) { |
170 sent.whenComplete((v, t) -> { |
170 sent.whenComplete((v, t) -> { |
171 try { |
171 try { |
172 channel.close(); |
172 channel.close(); |
173 } catch (IOException e) { |
173 } catch (IOException e) { |