目录
- 正文
- 1:如何运行项目
- 2:从客户端调用开始(springboot-zk-study项目)
- 3:服务端处理请求
- 4:接下来要做什么
正文
项目地址:gitee.com/baojh123/rp…
netty-study 这个项目是没用到的,可以删掉,主要是测试Netty自定义协议的
1:如何运行项目
1:本地起一个zookeeper服务
2: 只需要运行 rpc-server 和 springboot-zk-study二个项目即可
3: 二个项目的application.yml 都不需要改,唯一要改的就是zookeepr的连接配置信息
4:启动好之后,在浏览器访问
http://localhost:8081/zk/test
http://localhost:8081/zk/people
http://localhost:8081/zk/list
可以查看到返回结果
2:从客户端调用开始(springboot-zk-study项目)
@RestController @RequestMapping("/zk") public class ZkController { @Resource @MyResource private UserService userService; @Resource @MyResource private PeopleService peopleService; @GetMapping("/test") public String test() { return userService.test("bjh-",1)javascript; 编程客栈 } @GetMapping("/people") public Object people() { return peopleService.query(1L); } @GetMapping("/list") public Object list() { return peopleService.list(); } }
只需要在我们需要进行RPC调用的接口上添加 @MyResource 注解即可,当我们执行这个方法之后,就会执行代理方法,代理方法在 rpc-core 项目中,为了阅读清晰,我只贴出重点的方法
@Slf4j public class ServiceProxy<T> implements InvocationHandler, ApplicationContextAware, ApplicationRunner { ......省略一些代码 // 客户端执行方法之后,就会执行到这里的代理方法 @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //从注册中心拿到服务列表 ZkNodeData zkNodeData = objectMapper.readValue(nodeData, ZkNodeData.class); List<ZkProperties> zkPropertiesList = zkNodeData.getZkPropertiesList(); for(ZkProperties zkProperties : zkPropertiesList) { String interfaceName = zkProperties.getInterfaceName(); Class<?> declaringClass = method.getDeclaringClass(); if(StringUtils.equals(declaringClass.getName(),interfaceName)) { List<InterfaceInfo> info = zkProperties.getInfo(); InterfaceInfo interfaceInfo = info.get(0); String ipAddress = interfaceInfo.getIpAddress(); List<InterfaceImplInfo> interfaceImplInfo = interfaceInfo.getInterfaceImplInfo(); InterfaceImplInfo implInfo = interfaceImplInfo.get(0); String[] strings = ipAddress.split(":"); //与远程Netty服务端发起连接 RpcClient rpcClient = connNettyServer(strings[0], zkPropertiesSource.getNettyConnectPort()); /** * 封装请求参数 */ //获取方法参数类型 Class<?>[] pandroidarameterTypes = method.getParameterTypes(); List<String> types = getTy开发者_Go学习pes(parameterTypes); //同步调用 result = remoteCalljs(method.getName(), types, args, rpcClient, implInfo, interfaceName); log.info("返回结果是:{}",result); } } Class<?> returnType = method.getReturnType(); Object value = objectMapper.readValue(result.toString(), returnType); return value; } private RpcClient connNettyServer(String ipAddress,Integer port) { return new RpcClient(ipAddress,port); } private Object remoteCall(String methodName, List<String> argTypes, Object[] args,RpcClient rpcClient,InterfaceImplInfo implInfo,String interfaceName) throws Exception{ RpcMessage rpcMessage = new RpcMessage(); ...... //发送请求 Response result = rpcClient.sendRequest(rpcMessage); log.info("请求结果是:{}", jsONUtil.toJsonPrettyStr(result)); return result.getData(); } ......省略一些代码
我们初始化客户端连接和发送请求都在一个RpcClient的类中,我们看下这个类的代码
@Slf4j public class RpcClient { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap; private String ip; private Integer port; RpcClientHandler rpcClientHandler; private ChannelFuture channelFuture; public RpcClient(String ip,Integer port) { bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NIOSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //加入处理器 rpcClientHandler = new RpcClientHandler(); ch.pipeline().addLast(new RpcDecoder()); ch.pipeline().addLast(new RpcEncoder()); ch.pipeline().addLast(rpcClientHandler); } }); try { // 和远程Nett服务端建立连接 channelFuture = bootstrap.connect(ip, port).sync(); } catch (InterruptedException e) { e.printStackTrace(); } } public Response sendRequest(RpcMessage rpcMessage) throws Exception{ //发送请求 channelFuture.channel().writeAndFlush(rpcMessage).sync(); channelFuture.channel().closeFuture().sync(); log.info("获取返回结果====================="); Response response = rpcClientHandler.getResponse(); return response; } }
客户端在这发送请求到服务端之后,就接收服务端返回回来的消息即可,然后将返回结果返回给我们的接口。客户端的调用就到这里了,现在看下服务端的
3:服务端处理请求
服务端处理请求的核心都在 rpc-core的 RpcServerHandler中
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcMessage> { ObjectMapper objectMapper = new ObjectMapper(); @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception { Object obj = rpcMessage.getObj(); RpcMessage rpcMessageResponse = new RpcMessage(); Response response = new Response(); try{ Request request = objectMapper.readValue(obj.toString(), Request.class); String interfaceImplName = request.getInterfaceImplName(); Class<?> aClass = Class.forName(interfaceImplName); List<String> paramsTypes = request.getParamsTypes(); try { Object result = null; //判读方法是有参数的还是没有参数的 if(paramsTypes.isEmpty()) { Method declaredMethod = aClass.getDeclaredMethod(request.getMethodName()); result = declaredMethod.invoke(aClass.newInstance()); }else { Map<String, Object> paramsObjectMap = TypeParseUtil.parseTypeString2Class(paramsTypes, request.getParams().toArray()); Class<?>[] classTypes = (Class<?>[]) paramsObjectMap.get("classTypes"); Object[] args = (Object[]) paramsObjectMap.get("args"); result = aClass.getMethod(request.getMethodName(), classTypes).invoke(aClass.newInstance(), args); } log.info("返回结果是:{}",result); response.setData(objectMapper.writeValueAsString(result)); response.setIsOk(1); response.setErrInfo("error"); rpcMessageResponse.setObj(response); } catch (Throwable throwable) { throwable.printStackTrace(); response.setjavascriptData("error"); response.setIsOk(0); response.setErrInfo(throwable.getMessage()); rpcMessageResponse.setObj(response); } }catch (Exception e) { response.setData("error"); response.setIsOk(0); response.setErrInfo(e.getMessage()); rpcMessageResponse.setObj(response); } String valueAsString = objectMapper.writeValueAsString(response); rpcMessageResponse.setDataLength(valueAsString.getBytes(Charset.forName("utf-8")).length); rpcMessageResponse.setObj(valueAsString); channelHandlerContext.writeAndFlush(rpcMessageResponse); } }
服务端就拿到客户端传过来的接口名称,从zookeeper获取到具体的实现类,然后通过反射调用即可
4:接下来要做什么
上面只是简单的介绍了下整个调用的大概过程,还有很多问题没有解释清楚,比如
1:在客户端我们要使用UserService,但是你会发现我们使用了二个注解,一个是我们自定义的,一个是spring注入用的,但是在项目中我们并没有这个接口的实现类,spring是怎么将这个接口注入到自己容器中的呢
2: 为什么调用使用了 @MyResource的接口方法都会走代理方法,是怎么做到的
@Resource @MyResource private PeopleService peopleService;
3:我们的服务是怎么在服务启动的时候注册到zookeeper的,注册的信息又是什么,可以看下我们服务注册到zookeeper的信息如下
{ "zkPropertiesList": [{ "interfaceName": "com.bjh.service.PeopleService", "info": [{ "ipAddress": "192.168.83.1:9091", "interfaceImplInfo": [{ "name": "com.bjh.service.PeopleServiceImpl", "value": "com.bjh.service.PeopleServiceImpl" }] }] }, { "interfaceName": "com.bjh.service.UserService", "info": [{ "ipAddress": "192.168.83.1:9091", "interfaceImplInfo": [{ "name": "com.bjh.service.UserServiceImpl", "value": "com.bjh.service.UserServiceImpl" }] }] }] }
4:在我们的服务端的实现类,我们只使用了我们自定义的 @Service注解,这个注解不是Spring的
@Service public class PeopleServiceImpl implements PeopleService{ @Override public People query(long id) { People people = new People(); people.setId(id); people.setName("coco"); return people; } @Override public List<People> list() { List<People> list = new ArrayList<>(); People people = new People(); people.setId(123L); people.setName("coco"); People people2 = new People(); people2.setId(124L); people2.setName("baojh"); list.add(people); list.add(people2); return list; } }
5:还有客户端请求的结构体是怎么样的,还有返回响应结果是怎么样的等等,后续我会继续更新
更多关于Netty简易版RPC框架的资料请关注我们其它相关文章!
精彩评论