这篇文章主要介绍“怎么使用Netty实现类似Dubbo的远程接口调用”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“怎么使用Netty实现类似Dubbo的远程接口调用”文章能帮助大家解决问题。
一、Netty简介
Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
实现步骤:
创建接口和实现类
创建客户端代码
通过动态代理模式,封装netty远程接口调用
通过异步线程等待/通知,实现异步转同步
创建服务端代码
自定义编码解码器
编写测试客户端发送请求代码
二、完整代码实现
工程依赖引入
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.75.Final</version> </dependency>
1、创建接口和实现类
定义简单接口和实现类。通过注解定义接口和服务实现,在后续代码中解析注解。
@ServiceEntry定义接口serviceId
@MyService定义服务实现类
public interface IHelloService { @ServiceEntry(serviceId = "001", name = "hello") String hello(String msg); } @MyService public class HelloServiceImpl implements IHelloService { @Override public String hello(String msg) { return "re:这里是服务端,已收到客户端消息:" + msg.hashCode(); } }
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface ServiceEntry { /** * 服务Id */ String serviceId(); /** * 服务名称 */ String name() default ""; } @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface MyService { String value() default ""; }
2、客户端代码实现及动态代理和异步转同步
1)创建客户端Handler,MyClientHandler,继承ChannelInboundHandlerAdapter,并实现Callable接口
客户端发送请求时,会调用call方法,在这里将异步转同步
将请求context放入map,并等待线程,在收到服务端返回时,异步通知线程执行,返回结果数据
收到服务端返回时,设置返回结果数据,并通知线程执行
public class MyClientHandler extends ChannelInboundHandlerAdapter implements Callable<String> { private ChannelHandlerContext ctx; private ConcurrentHashMap<String, SyncSendContext> syncSendContextMap = new ConcurrentHashMap<>(); private Object[] param; private String serviceId; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端和服务端链接成功"); this.ctx = ctx; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("客户端收到服务端回复: " + msg); ResponseData data = (ResponseData) msg; String id = data.getId(); // 收到服务端返回时,设置返回结果数据,并通知线程执行 SyncSendContext context = syncSendContextMap.get(id); context.setResp(data); synchronized (context) { context.notify(); } } @Override public String call() throws Exception { System.out.println("客户端向服务端发送消息: " + param[0].toString()); String id = UUID.randomUUID().toString(); RequestData data = new RequestData(); data.setId(id); //强制设置参数1 data.setData(param[0].toString()); data.setServiceId(serviceId); SyncSendContext context = new SyncSendContext(); context.setRequest(data); // 将请求context放入map,并等待线程,在收到服务端返回时,异步通知线程执行,返回结果数据 syncSendContextMap.put(id, context); synchronized (context) { ctx.writeAndFlush(data); context.wait(); return (String) context.getResp().getData(); } } public void setParam(Object[] param) { this.param = param; } public void setServiceId(String serviceId) { this.serviceId = serviceId; } }
2)创建客户端代码,MyClient
通过动态代理,包装远程服务请求
初始化服务端链接,通过双检锁确保clientHandler是单例实现
发送请求时,通过线程池异步发送clientHandler
public class MyClient { private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private MyClientHandler clientHandler; // 通过动态代理,包装远程服务请求 public <T> T getServie(final Class<T> service) { return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{service}, (proxy, method, args) -> { if (clientHandler == null) { init("127.0.0.1", 7000); } ServiceEntry annotation = method.getAnnotation(ServiceEntry.class); if (annotation == null) { return null; } clientHandler.setParam(args); clientHandler.setServiceId(annotation.serviceId()); return executor.submit(clientHandler).get(); }); } // 初始化服务端链接,通过双检锁确保clientHandler是单例实现 private synchronized void init(String hostname, int port) { if (clientHandler != null) { return; } clientHandler = new MyClientHandler(); EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap().group(group) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new ResponseMessageCodec()); pipeline.addLast(new RequestMessageCodec()); pipeline.addLast(clientHandler); } }); bootstrap.connect(hostname, port).sync(); } catch (Exception e) { e.printStackTrace(); } } }
3、服务端代码实现
1)创建服务工程类ServiceFacatory,解析注解保存服务接口和实现类,调用的时候从Map直接获取
public class ServiceFacatory { private static final Map<String, Method> methodMap = new HashMap<>(); private static final Map<String, Object> serviceMap = new HashMap<>(); public static void init() throws Exception { // 要扫描的包 String packages = "com.hj.netty.dubbo.api"; Set<MethodInfo> methods = PackageUtils.findClassAnnotationMethods(packages, ServiceEntry.class); for (MethodInfo info : methods) { ServiceEntry serviceEntry = (ServiceEntry) info.getAnnotation(); methodMap.put(serviceEntry.serviceId(), info.getMethod()); String serviceName = info.getMethod().getDeclaringClass().getName(); if (!serviceMap.containsKey(serviceName)) { Object instance = info.getMethod().getDeclaringClass().newInstance(); serviceMap.put(serviceName, instance); } } } public static Object invoke(String serviceId, Object args) throws Exception { Method method = methodMap.get(serviceId); String serviceName = method.getDeclaringClass().getName(); Object instance = serviceMap.get(serviceName); Object result = method.invoke(instance, args); return result; } } @Data @AllArgsConstructor public class MethodInfo { private Annotation annotation; private Method method; }
2)包解析工具类,解析指定目录下的所有service类
import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.io.Resource; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.core.io.support.ResourcePatternResolver; import org.springframework.core.type.classreading.CachingMetadataReaderFactory; import org.springframework.core.type.classreading.MetadataReader; import org.springframework.core.type.classreading.MetadataReaderFactory; import org.springframework.util.SystemPropertyUtils; import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.lang.reflect.Parameter; import java.util.*; public class PackageUtils { private final static Logger log = LoggerFactory.getLogger(PackageUtils.class); //扫描 scanPackages 下的文件的匹配符 protected static final String DEFAULT_RESOURCE_PATTERN = "**/*.class"; /** * 结合spring的类扫描方式 * 根据需要扫描的包路径及相应的注解,获取最终测method集合 * 仅返回public方法,如果方法是非public类型的,不会被返回 * 可以扫描工程下的class文件及jar中的class文件 * * @param scanPackages * @param annotation * @return */ public static Set<MethodInfo> findClassAnnotationMethods(String scanPackages, Class<? extends Annotation> annotation) { //获取所有的类 Set<String> clazzSet = findPackageClass(scanPackages); Set<MethodInfo> methods = new HashSet<>(); //遍历类,查询相应的annotation方法 for (String clazz : clazzSet) { try { Set<MethodInfo> ms = findAnnotationMethods(clazz, annotation); methods.addAll(ms); } catch (ClassNotFoundException ignore) { } } return methods; } public static Set<MethodInfo> findAnnotationMethods(String fullClassName, Class<? extends Annotation> anno) throws ClassNotFoundException { Set<MethodInfo> methodSet = new HashSet<>(); Class<?> clz = Class.forName(fullClassName); // 存储接口中定义的方法 Map<String, Method> mapMethodInf = new HashMap<>(); for (int i = 0; i < clz.getInterfaces().length; i++) { Class<?> inf = clz.getInterfaces()[i]; Method[] methods = inf.getDeclaredMethods(); for (Method method : methods) { String key = getMethodKey(method); mapMethodInf.put(key, method); } } Method[] methods = clz.getDeclaredMethods(); for (Method method : methods) { if (method.getModifiers() != Modifier.PUBLIC) { continue; } Annotation annotation = method.getAnnotation(anno); if (annotation != null) { methodSet.add(new MethodInfo(annotation,method)); } else { // 从接口中读取对应的方法 String key = getMethodKey(method); Method methodInf = mapMethodInf.get(key); annotation = methodInf.getAnnotation(anno); if (annotation != null) { methodSet.add(new MethodInfo(annotation,method)); } } } return methodSet; } /** * 根据扫描包的,查询下面的所有类 * * @param scanPackages 扫描的package路径 * @return */ private static Set<String> findPackageClass(String scanPackages) { if (StringUtils.isBlank(scanPackages)) { return Collections.EMPTY_SET; } //验证及排重包路径,避免父子路径多次扫描 Set<String> packages = checkPackage(scanPackages); ResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver(); MetadataReaderFactory metadataReaderFactory = new CachingMetadataReaderFactory(resourcePatternResolver); Set<String> clazzSet = new HashSet<String>(); for (String basePackage : packages) { if (StringUtils.isBlank(basePackage)) { continue; } String packageSearchPath = ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX + org.springframework.util.ClassUtils.convertClassNameToResourcePath(SystemPropertyUtils.resolvePlaceholders(basePackage)) + "/" + DEFAULT_RESOURCE_PATTERN; try { Resource[] resources = resourcePatternResolver.getResources(packageSearchPath); for (Resource resource : resources) { //检查resource,这里的resource都是class String clazz = loadClassName(metadataReaderFactory, resource); clazzSet.add(clazz); } } catch (Exception e) { log.error("获取包下面的类信息失败,package:" + basePackage, e); } } return clazzSet; } /** * 排重、检测package父子关系,避免多次扫描 * * @param scanPackages * @return 返回检查后有效的路径集合 */ private static Set<String> checkPackage(String scanPackages) { if (StringUtils.isBlank(scanPackages)) { return Collections.EMPTY_SET; } Set<String> packages = new HashSet<>(); //排重路径 Collections.addAll(packages, scanPackages.split(",")); String[] strings = packages.toArray(new String[packages.size()]); for (String pInArr : strings) { if (StringUtils.isBlank(pInArr) || pInArr.equals(".") || pInArr.startsWith(".")) { continue; } if (pInArr.endsWith(".")) { pInArr = pInArr.substring(0, pInArr.length() - 1); } Iterator<String> packageIte = packages.iterator(); boolean needAdd = true; while (packageIte.hasNext()) { String pack = packageIte.next(); if (pInArr.startsWith(pack + ".")) { //如果待加入的路径是已经加入的pack的子集,不加入 needAdd = false; } else if (pack.startsWith(pInArr + ".")) { //如果待加入的路径是已经加入的pack的父集,删除已加入的pack packageIte.remove(); } } if (needAdd) { packages.add(pInArr); } } return packages; } /** * 加载资源,根据resource获取className * * @param metadataReaderFactory spring中用来读取resource为class的工具 * @param resource 这里的资源就是一个Class */ private static String loadClassName(MetadataReaderFactory metadataReaderFactory, Resource resource) { try { if (resource.isReadable()) { MetadataReader metadataReader = metadataReaderFactory.getMetadataReader(resource); if (metadataReader != null) { return metadataReader.getClassMetadata().getClassName(); } } } catch (Exception e) { log.error("根据resource获取类名称失败", e); } return null; } private static String getMethodKey(Method method) { StringBuilder key = new StringBuilder(method.getName()); for (Parameter parameter : method.getParameters()) { key.append(parameter.getType().getName()) .append(parameter.getName()); } return key.toString(); } }
3)创建服务端Handler类,接收客户端请求,并调用服务实现类执行接口
public class MyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端接入"); super.channelActive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("收到客户端消息:" + msg); RequestData req = (RequestData) msg; if (req != null) { String args = req.getData(); String serviceId = req.getServiceId(); // 调用服务实现类 Object res = ServiceFacatory.invoke(serviceId, args); ResponseData resp = new ResponseData(); resp.setData(res); resp.setId(req.getId()); ctx.writeAndFlush(resp); } System.out.println("----------响应结束----------" + req.getData()); } }
4)创建服务端启动类MyServer、ServerApp,启动端口监听;加入编解码器和服务端MyServerHandler
public class MyServer { public static void start(String hostname, int port) throws Exception { ServiceFacatory.init(); EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new RequestMessageCodec()); pipeline.addLast(new ResponseMessageCodec()); pipeline.addLast(new MyServerHandler()); } }); ChannelFuture future = bootstrap.bind(hostname, port).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } } public class ServerApp { public static void main(String[] args) throws Exception { MyServer.start("127.0.0.1", 7000); } }
4、自定义编码解码器
1)创建请求数据编解码器RequestMessageCodec,实现String和请求参数对象RequestData之间互相转换
public class RequestMessageCodec extends MessageToMessageCodec<String, RequestData> { @Override protected void encode(ChannelHandlerContext ctx, RequestData msg, List<Object> out) throws Exception { System.out.println("RequestMessageCodec.encode 被调用 " + msg); String json = JSONObject.toJSONString(msg); out.add(json); } @Override protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception { System.out.println("RequestMessageCodec.decode 被调用&