Thrift 04|Thrift工程化实践 - 超时时间与多线程调用

Thrift 04|Thrift工程化实践 - 超时时间与多线程调用

精选文章moguli202025-01-17 11:06:4632A+A-

前言

前面几篇介绍了Spring中Thrift的简单使用。但是在实际的工程应用中,多线程调用Thrfit服务端,还存在很多需要优化的点,本篇开始会对这些进行介绍。


超时时间

首先,在实际业务中,会根据使用场景、接口性能、稳定性等多方面原因,配置不同的超时时间,避免单服务异常拖垮整个应用。

下面结合代码来看下如何进行超时时间的配置。

2.1 配置文件

在配置文件application.yml中自定义配置,这里可以主要关注connectTimeOutsocketTimeOut

thrift:
  properties:
    userV1:
      host: localhost
      port: 9001
      connectTimeOut: 500
      socketTimeOut: 1000
    userV2:
      host: localhost
      port: 9002
      connectTimeOut: 200
      socketTimeOut: 1000


2.2 自定义配置类

自定义两个配置类ThriftConfiguration和ThriftConfigProperties。其中ThriftConfigProperties用于定义Thrfit所需要的配置,ThriftConfiguration用于注入配置文件中的值。

@Configuration
@ConfigurationProperties("thrift")
@Data
public class ThriftConfiguration {

   private Map<String, ThriftConfigProperties> properties;

}


@Data
public class ThriftConfigProperties {

   private String host;

   private int port;

   int socketTimeout;

   int connectTimeout;
}


2.3 bean配置

通过ThriftConfiguration获取到配置文件中的值,将其注入到userServiceClient中,根据对应的服务名(userV1)获取到对应的配置信息。在Bean初始化时进行装配。

@Configuration
public class ThriftClientConfig {

   @Bean
   public UserThriftService.Client userServiceClient(ThriftConfiguration thriftConfiguration) throws Exception {
      ThriftConfigProperties properties = thriftConfiguration.getProperties().get("userV1");
      TSocket transport = new TSocket(properties.getHost(), properties.getPort());
      // 默认0则无超时时间
      transport.setConnectTimeout(properties.getConnectTimeout());
      transport.setSocketTimeout(properties.getSocketTimeout());
      transport.open();

      TBinaryProtocol protocol = new TBinaryProtocol(transport);
      return new UserThriftService.Client(protocol);
   }

}

以上即完成了超时时间的配置。


多线程访问Thrfit Server

在前面几篇的案例中,如果多线程访问Thrfit Server会发生异常,下面结合代码进行说明

/**
 * @author Klein
 * @date 2024-10-19 12:34
 */
@RestController
@RequestMapping("/user")
@Slf4j
public class UserController {

   @Resource
   private UserThriftService.Client userServiceClient;

   private ExecutorService executorService = Executors.newFixedThreadPool(5);

   @GetMapping("/list")
   public List<User> findUsers() {
      List<User> users = new ArrayList<>();
      IntStream.rangeClosed(0, 9).forEach(e -> {
         executorService.execute(() -> {
            log.info("start:{}", e);
            try {
               users.add(userServiceClient.findUser(e));
            } catch (TException exe) {
               exe.printStackTrace();
            }
         });
      });
      log.info("users: {}", users);
      return users;
   }

}

上面的代码是模拟多线程访问Thrfit Server,执行会报如下错误:

org.apache.thrift.transport.TTransportException: Socket is closed by peer.

at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:177)

at org.apache.thrift.transport.TTransport.readAll(TTransport.java:100)

at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:519)

at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:387)

at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:271)

at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:79)

at ltd.klein.thrift.server.api.iface.UserThriftService$Client.recv_findUser(UserThriftService.java:64) at ltd.klein.thrift.server.api.iface.UserThriftService$Client.findUser(UserThriftService.java:51)

at ltd.klein.thrift.client.controller.UserController.lambda$findUsers$0(UserController.java:63)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

at java.base/java.lang.Thread.run(Thread.java:834)

org.apache.thrift.transport.TTransportException: java.net.SocketException: Broken pipe (Write failed)

at org.apache.thrift.transport.TIOStreamTransport.flush(TIOStreamTransport.java:206)

at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:75)

at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:64)

at ltd.klein.thrift.server.api.iface.UserThriftService$Client.send_findUser(UserThriftService.java:58) at ltd.klein.thrift.server.api.iface.UserThriftService$Client.findUser(UserThriftService.java:50)

at ltd.klein.thrift.client.controller.UserController.lambda$findUsers$0(UserController.java:63)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.net.SocketException: Broken pipe (Write failed)

at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)

at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)

at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)

at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)

at java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142)

at org.apache.thrift.transport.TIOStreamTransport.flush(TIOStreamTransport.java:201)

... 8 more


原因是Thrfit Client不支持多线程调用同一个实例。但是实际场景中,基本上都会出现不同用户线程访问同一个Thrfit接口的场景。为了解决这个问题,就需要保证不同用户线程访问的是不同的实例。

本篇仅介绍其中一种方案:多例。即利用Spring Bean的scope。Spring Bean默认是单例的,所以不同线程访问的都是同一个Client实例,如果采用多例则可以解决上述问题。


3.1 多例实现

3.1.1 注册多例Bean

通过@Scope注解,进行多例配置。在调用处注入时也需要注意。

3.1.2 使用多例Bean

通过@Lookup注解实现多例bean的使用,如图

具体使用处如下所示:

package ltd.klein.thrift.client.controller;

import lombok.extern.slf4j.Slf4j;
import ltd.klein.thrift.server.api.iface.UserThriftService;
import ltd.klein.thrift.server.api.iface.UserThriftServiceV2;
import ltd.klein.thrift.server.api.param.User;
import ltd.klein.thrift.server.api.resp.UserResponse;
import org.apache.thrift.TException;
import org.springframework.beans.factory.annotation.Lookup;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

/**
 * @author Klein
 * @date 2024-10-19 12:34
 */
@RestController
@RequestMapping("/user")
@Slf4j
public class UserController {

   @Resource
   private UserThriftServiceV2.Client userServiceClientV2;

   @Lookup // 每次使用新的实例,也可以使用ObjectProvider
   public UserThriftServiceV2.Client getUserServiceClientV2() {
      return null;
   }

   private ExecutorService executorService = Executors.newFixedThreadPool(5);

   @GetMapping("/v2/list")
   public UserResponse findUsersV2() throws InterruptedException {
      UserResponse userResponse = new UserResponse();
      userResponse.setMsg("succ");
      userResponse.setSuccess(true);
      List<ltd.klein.thrift.server.api.resp.User> users = new CopyOnWriteArrayList<>();
      userResponse.setUsers(users);
      CountDownLatch countDownLatch = new CountDownLatch(10);
      IntStream.rangeClosed(0, 9).forEach(i -> {
         executorService.execute(() -> {
            log.info("start:{}", i);
            UserThriftServiceV2.Client client = getUserServiceClientV2();
            log.info("client: {}", client);
            try {
               users.addAll(client.findUser(i).getUsers());
            } catch (TException exe) {
               exe.printStackTrace();
            }
            countDownLatch.countDown();
         });
      });
      countDownLatch.await();
      log.info("users: {}", users);
      return userResponse;
   }
}

总结

本篇介绍了工程实际使用时需要的超时时间配置,同时介绍了一种解决多线程调用Thrift的方案。不过此种方案并不是最优解法,下一篇将介绍更优的方案。如果大家有什么想法,欢迎分享与讨论。

点击这里复制本文地址 以上内容由莫古技术网整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!
qrcode

莫古技术网 © All Rights Reserved.  滇ICP备2024046894号-2