前言
前面几篇介绍了Spring中Thrift的简单使用。但是在实际的工程应用中,多线程调用Thrfit服务端,还存在很多需要优化的点,本篇开始会对这些进行介绍。
超时时间
首先,在实际业务中,会根据使用场景、接口性能、稳定性等多方面原因,配置不同的超时时间,避免单服务异常拖垮整个应用。
下面结合代码来看下如何进行超时时间的配置。
2.1 配置文件
在配置文件application.yml中自定义配置,这里可以主要关注connectTimeOut和socketTimeOut
thrift:
properties:
userV1:
host: localhost
port: 9001
connectTimeOut: 500
socketTimeOut: 1000
userV2:
host: localhost
port: 9002
connectTimeOut: 200
socketTimeOut: 1000
2.2 自定义配置类
自定义两个配置类ThriftConfiguration和ThriftConfigProperties。其中ThriftConfigProperties用于定义Thrfit所需要的配置,ThriftConfiguration用于注入配置文件中的值。
@Configuration
@ConfigurationProperties("thrift")
@Data
public class ThriftConfiguration {
private Map<String, ThriftConfigProperties> properties;
}
@Data
public class ThriftConfigProperties {
private String host;
private int port;
int socketTimeout;
int connectTimeout;
}
2.3 bean配置
通过ThriftConfiguration获取到配置文件中的值,将其注入到userServiceClient中,根据对应的服务名(userV1)获取到对应的配置信息。在Bean初始化时进行装配。
@Configuration
public class ThriftClientConfig {
@Bean
public UserThriftService.Client userServiceClient(ThriftConfiguration thriftConfiguration) throws Exception {
ThriftConfigProperties properties = thriftConfiguration.getProperties().get("userV1");
TSocket transport = new TSocket(properties.getHost(), properties.getPort());
// 默认0则无超时时间
transport.setConnectTimeout(properties.getConnectTimeout());
transport.setSocketTimeout(properties.getSocketTimeout());
transport.open();
TBinaryProtocol protocol = new TBinaryProtocol(transport);
return new UserThriftService.Client(protocol);
}
}
以上即完成了超时时间的配置。
多线程访问Thrfit Server
在前面几篇的案例中,如果多线程访问Thrfit Server会发生异常,下面结合代码进行说明
/**
* @author Klein
* @date 2024-10-19 12:34
*/
@RestController
@RequestMapping("/user")
@Slf4j
public class UserController {
@Resource
private UserThriftService.Client userServiceClient;
private ExecutorService executorService = Executors.newFixedThreadPool(5);
@GetMapping("/list")
public List<User> findUsers() {
List<User> users = new ArrayList<>();
IntStream.rangeClosed(0, 9).forEach(e -> {
executorService.execute(() -> {
log.info("start:{}", e);
try {
users.add(userServiceClient.findUser(e));
} catch (TException exe) {
exe.printStackTrace();
}
});
});
log.info("users: {}", users);
return users;
}
}
上面的代码是模拟多线程访问Thrfit Server,执行会报如下错误:
org.apache.thrift.transport.TTransportException: Socket is closed by peer.
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:177)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:100)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:519)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:387)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:271)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:79)
at ltd.klein.thrift.server.api.iface.UserThriftService$Client.recv_findUser(UserThriftService.java:64) at ltd.klein.thrift.server.api.iface.UserThriftService$Client.findUser(UserThriftService.java:51)
at ltd.klein.thrift.client.controller.UserController.lambda$findUsers$0(UserController.java:63)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
org.apache.thrift.transport.TTransportException: java.net.SocketException: Broken pipe (Write failed)
at org.apache.thrift.transport.TIOStreamTransport.flush(TIOStreamTransport.java:206)
at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:75)
at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:64)
at ltd.klein.thrift.server.api.iface.UserThriftService$Client.send_findUser(UserThriftService.java:58) at ltd.klein.thrift.server.api.iface.UserThriftService$Client.findUser(UserThriftService.java:50)
at ltd.klein.thrift.client.controller.UserController.lambda$findUsers$0(UserController.java:63)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.net.SocketException: Broken pipe (Write failed)
at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
at java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142)
at org.apache.thrift.transport.TIOStreamTransport.flush(TIOStreamTransport.java:201)
... 8 more
原因是Thrfit Client不支持多线程调用同一个实例。但是实际场景中,基本上都会出现不同用户线程访问同一个Thrfit接口的场景。为了解决这个问题,就需要保证不同用户线程访问的是不同的实例。
本篇仅介绍其中一种方案:多例。即利用Spring Bean的scope。Spring Bean默认是单例的,所以不同线程访问的都是同一个Client实例,如果采用多例则可以解决上述问题。
3.1 多例实现
3.1.1 注册多例Bean
通过@Scope注解,进行多例配置。在调用处注入时也需要注意。
3.1.2 使用多例Bean
通过@Lookup注解实现多例bean的使用,如图
具体使用处如下所示:
package ltd.klein.thrift.client.controller;
import lombok.extern.slf4j.Slf4j;
import ltd.klein.thrift.server.api.iface.UserThriftService;
import ltd.klein.thrift.server.api.iface.UserThriftServiceV2;
import ltd.klein.thrift.server.api.param.User;
import ltd.klein.thrift.server.api.resp.UserResponse;
import org.apache.thrift.TException;
import org.springframework.beans.factory.annotation.Lookup;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
/**
* @author Klein
* @date 2024-10-19 12:34
*/
@RestController
@RequestMapping("/user")
@Slf4j
public class UserController {
@Resource
private UserThriftServiceV2.Client userServiceClientV2;
@Lookup // 每次使用新的实例,也可以使用ObjectProvider
public UserThriftServiceV2.Client getUserServiceClientV2() {
return null;
}
private ExecutorService executorService = Executors.newFixedThreadPool(5);
@GetMapping("/v2/list")
public UserResponse findUsersV2() throws InterruptedException {
UserResponse userResponse = new UserResponse();
userResponse.setMsg("succ");
userResponse.setSuccess(true);
List<ltd.klein.thrift.server.api.resp.User> users = new CopyOnWriteArrayList<>();
userResponse.setUsers(users);
CountDownLatch countDownLatch = new CountDownLatch(10);
IntStream.rangeClosed(0, 9).forEach(i -> {
executorService.execute(() -> {
log.info("start:{}", i);
UserThriftServiceV2.Client client = getUserServiceClientV2();
log.info("client: {}", client);
try {
users.addAll(client.findUser(i).getUsers());
} catch (TException exe) {
exe.printStackTrace();
}
countDownLatch.countDown();
});
});
countDownLatch.await();
log.info("users: {}", users);
return userResponse;
}
}
总结
本篇介绍了工程实际使用时需要的超时时间配置,同时介绍了一种解决多线程调用Thrift的方案。不过此种方案并不是最优解法,下一篇将介绍更优的方案。如果大家有什么想法,欢迎分享与讨论。