Skip to content

Commit 27025e2

Browse files
committed
Shut down RSocket client examples cleanly
Signed-off-by: jeroen.veltman <jeroen.veltman@nextend.nl>
1 parent c49ebbf commit 27025e2

3 files changed

Lines changed: 99 additions & 69 deletions

File tree

rsocket-examples/src/main/java/io/rsocket/examples/transport/h3/client/RSocketClientExample.java

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import io.rsocket.core.RSocketConnector;
88
import io.rsocket.core.RSocketServer;
99
import io.rsocket.examples.transport.h3.Http3TransportFactory;
10+
import io.rsocket.examples.transport.support.ExampleLifecycle;
11+
import io.rsocket.transport.netty.server.CloseableChannel;
1012
import io.rsocket.util.DefaultPayload;
1113
import java.time.Duration;
1214
import org.slf4j.Logger;
@@ -19,36 +21,44 @@ public class RSocketClientExample {
1921

2022
public static void main(String[] args) {
2123

22-
RSocketServer.create(
23-
SocketAcceptor.forRequestResponse(
24-
p -> {
25-
String data = p.getDataUtf8();
26-
logger.info("Received request data {}", data);
24+
CloseableChannel server =
25+
RSocketServer.create(
26+
SocketAcceptor.forRequestResponse(
27+
p -> {
28+
String data = p.getDataUtf8();
29+
logger.info("Received request data {}", data);
2730

28-
Payload responsePayload = DefaultPayload.create("Echo: " + data);
29-
p.release();
31+
Payload responsePayload = DefaultPayload.create("Echo: " + data);
32+
p.release();
3033

31-
return Mono.just(responsePayload);
32-
}))
33-
.bind(Http3TransportFactory.server("localhost", 7000))
34-
.delaySubscription(Duration.ofSeconds(5))
35-
.doOnNext(cc -> logger.info("Server started on the address : {}", cc.address()))
36-
.block();
34+
return Mono.just(responsePayload);
35+
}))
36+
.bind(Http3TransportFactory.server("localhost", 7000))
37+
.delaySubscription(Duration.ofSeconds(5))
38+
.doOnNext(cc -> logger.info("Server started on the address : {}", cc.address()))
39+
.block();
3740

3841
Mono<RSocket> source =
3942
RSocketConnector.create()
4043
.reconnect(Retry.backoff(50, Duration.ofMillis(500)))
4144
.connect(Http3TransportFactory.client("localhost", 7000));
4245

43-
RSocketClient.from(source)
44-
.requestResponse(Mono.just(DefaultPayload.create("Test Request")))
45-
.doOnSubscribe(s -> logger.info("Executing Request"))
46-
.doOnNext(
47-
d -> {
48-
logger.info("Received response data {}", d.getDataUtf8());
49-
d.release();
50-
})
51-
.repeat(10)
52-
.blockLast();
46+
RSocketClient client = RSocketClient.from(source);
47+
48+
try {
49+
client
50+
.requestResponse(Mono.just(DefaultPayload.create("Test Request")))
51+
.doOnSubscribe(s -> logger.info("Executing Request"))
52+
.doOnNext(
53+
d -> {
54+
logger.info("Received response data {}", d.getDataUtf8());
55+
d.release();
56+
})
57+
.repeat(10)
58+
.blockLast();
59+
} finally {
60+
ExampleLifecycle.close(client);
61+
ExampleLifecycle.close(server);
62+
}
5363
}
5464
}

rsocket-examples/src/main/java/io/rsocket/examples/transport/quic/client/RSocketClientExample.java

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import io.rsocket.core.RSocketConnector;
88
import io.rsocket.core.RSocketServer;
99
import io.rsocket.examples.transport.quic.QuicTransportFactory;
10+
import io.rsocket.examples.transport.support.ExampleLifecycle;
11+
import io.rsocket.transport.netty.server.CloseableChannel;
1012
import io.rsocket.util.DefaultPayload;
1113
import java.time.Duration;
1214
import org.slf4j.Logger;
@@ -19,36 +21,44 @@ public class RSocketClientExample {
1921

2022
public static void main(String[] args) {
2123

22-
RSocketServer.create(
23-
SocketAcceptor.forRequestResponse(
24-
p -> {
25-
String data = p.getDataUtf8();
26-
logger.info("Received request data {}", data);
24+
CloseableChannel server =
25+
RSocketServer.create(
26+
SocketAcceptor.forRequestResponse(
27+
p -> {
28+
String data = p.getDataUtf8();
29+
logger.info("Received request data {}", data);
2730

28-
Payload responsePayload = DefaultPayload.create("Echo: " + data);
29-
p.release();
31+
Payload responsePayload = DefaultPayload.create("Echo: " + data);
32+
p.release();
3033

31-
return Mono.just(responsePayload);
32-
}))
33-
.bind(QuicTransportFactory.server("localhost", 7000))
34-
.delaySubscription(Duration.ofSeconds(5))
35-
.doOnNext(cc -> logger.info("Server started on the address : {}", cc.address()))
36-
.block();
34+
return Mono.just(responsePayload);
35+
}))
36+
.bind(QuicTransportFactory.server("localhost", 7000))
37+
.delaySubscription(Duration.ofSeconds(5))
38+
.doOnNext(cc -> logger.info("Server started on the address : {}", cc.address()))
39+
.block();
3740

3841
Mono<RSocket> source =
3942
RSocketConnector.create()
4043
.reconnect(Retry.backoff(50, Duration.ofMillis(500)))
4144
.connect(QuicTransportFactory.client("localhost", 7000));
4245

43-
RSocketClient.from(source)
44-
.requestResponse(Mono.just(DefaultPayload.create("Test Request")))
45-
.doOnSubscribe(s -> logger.info("Executing Request"))
46-
.doOnNext(
47-
d -> {
48-
logger.info("Received response data {}", d.getDataUtf8());
49-
d.release();
50-
})
51-
.repeat(10)
52-
.blockLast();
46+
RSocketClient client = RSocketClient.from(source);
47+
48+
try {
49+
client
50+
.requestResponse(Mono.just(DefaultPayload.create("Test Request")))
51+
.doOnSubscribe(s -> logger.info("Executing Request"))
52+
.doOnNext(
53+
d -> {
54+
logger.info("Received response data {}", d.getDataUtf8());
55+
d.release();
56+
})
57+
.repeat(10)
58+
.blockLast();
59+
} finally {
60+
ExampleLifecycle.close(client);
61+
ExampleLifecycle.close(server);
62+
}
5363
}
5464
}

rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/client/RSocketClientExample.java

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
import io.rsocket.core.RSocketClient;
77
import io.rsocket.core.RSocketConnector;
88
import io.rsocket.core.RSocketServer;
9+
import io.rsocket.examples.transport.support.ExampleLifecycle;
910
import io.rsocket.transport.netty.client.TcpClientTransport;
11+
import io.rsocket.transport.netty.server.CloseableChannel;
1012
import io.rsocket.transport.netty.server.TcpServerTransport;
1113
import io.rsocket.util.DefaultPayload;
1214
import java.time.Duration;
@@ -20,36 +22,44 @@ public class RSocketClientExample {
2022

2123
public static void main(String[] args) {
2224

23-
RSocketServer.create(
24-
SocketAcceptor.forRequestResponse(
25-
p -> {
26-
String data = p.getDataUtf8();
27-
logger.info("Received request data {}", data);
25+
CloseableChannel server =
26+
RSocketServer.create(
27+
SocketAcceptor.forRequestResponse(
28+
p -> {
29+
String data = p.getDataUtf8();
30+
logger.info("Received request data {}", data);
2831

29-
Payload responsePayload = DefaultPayload.create("Echo: " + data);
30-
p.release();
32+
Payload responsePayload = DefaultPayload.create("Echo: " + data);
33+
p.release();
3134

32-
return Mono.just(responsePayload);
33-
}))
34-
.bind(TcpServerTransport.create("localhost", 7000))
35-
.delaySubscription(Duration.ofSeconds(5))
36-
.doOnNext(cc -> logger.info("Server started on the address : {}", cc.address()))
37-
.block();
35+
return Mono.just(responsePayload);
36+
}))
37+
.bind(TcpServerTransport.create("localhost", 7000))
38+
.delaySubscription(Duration.ofSeconds(5))
39+
.doOnNext(cc -> logger.info("Server started on the address : {}", cc.address()))
40+
.block();
3841

3942
Mono<RSocket> source =
4043
RSocketConnector.create()
4144
.reconnect(Retry.backoff(50, Duration.ofMillis(500)))
4245
.connect(TcpClientTransport.create("localhost", 7000));
4346

44-
RSocketClient.from(source)
45-
.requestResponse(Mono.just(DefaultPayload.create("Test Request")))
46-
.doOnSubscribe(s -> logger.info("Executing Request"))
47-
.doOnNext(
48-
d -> {
49-
logger.info("Received response data {}", d.getDataUtf8());
50-
d.release();
51-
})
52-
.repeat(10)
53-
.blockLast();
47+
RSocketClient client = RSocketClient.from(source);
48+
49+
try {
50+
client
51+
.requestResponse(Mono.just(DefaultPayload.create("Test Request")))
52+
.doOnSubscribe(s -> logger.info("Executing Request"))
53+
.doOnNext(
54+
d -> {
55+
logger.info("Received response data {}", d.getDataUtf8());
56+
d.release();
57+
})
58+
.repeat(10)
59+
.blockLast();
60+
} finally {
61+
ExampleLifecycle.close(client);
62+
ExampleLifecycle.close(server);
63+
}
5464
}
5565
}

0 commit comments

Comments
 (0)