基于儿童优先发展的幼儿园一日生活保育 基于Netty4手把手实现一个带注册中心和注解的Dubbo框架


基于儿童优先发展的幼儿园一日生活保育 基于Netty4手把手实现一个带注册中心和注解的Dubbo框架

文章插图
阅读这篇文章之前 , 建议先阅读和这篇文章关联的内容 。
1. 详细剖析分布式微服务架构下网络通信的底层实现原理(图解)
2. (年薪60W的技巧)工作了5年 , 你真的理解Netty以及为什么要用吗?(深度干货)
3. 深度解析Netty中的核心组件(图解+实例)
4. BAT面试必问细节:关于Netty中的ByteBuf详解
5. 通过大量实战案例分解Netty中是如何解决拆包黏包问题的?
6. 基于Netty实现自定义消息通信协议(协议设计及解析应用实战)
7. 全网最详细最齐全的序列化技术及深度解析与应用实战
8. 手把手教你基于Netty实现一个基础的RPC框架(通俗易懂)
在本篇文章中 , 我们继续围绕Netty手写实现RPC基础篇进行优化 , 主要引入几个点
  • 集成spring , 实现注解驱动配置
  • 集成zookeeper , 实现服务注册
  • 增加负载均衡实现
源代码 , 加「跟着Mic学架构」微信号 , 回复『rpc』获取 。
增加注解驱动主要涉及到的修改模块
  • netty-rpc-protocol
  • netty-rpc-provider
netty-rpc-protocol当前模块主要修改的类如下 。
基于儿童优先发展的幼儿园一日生活保育 基于Netty4手把手实现一个带注册中心和注解的Dubbo框架

文章插图
图7-1下面针对netty-rpc-protocol模块的修改如下
增加注解驱动这个注解的作用是用来指定某些服务为远程服务
@Target(ElementType.TYPE)// Target说明了Annotation所修饰的对象范围, TYPE:用于描述类、接口(包括注解类型) 或enum声明@Retention(RetentionPolicy.RUNTIME)// Reteniton的作用是定义被它所注解的注解保留多久 , 保留至运行时 。所以我们可以通过反射去获取注解信息 。@Componentpublic @interface GpRemoteService {}SpringRpcProviderBean这个类主要用来在启动NettyServer , 以及保存bean的映射关系
@Slf4jpublic class SpringRpcProviderBean implements InitializingBean, BeanPostProcessor {private final int serverPort;private final String serverAddress;public SpringRpcProviderBean(int serverPort) throws UnknownHostException {this.serverPort = serverPort;InetAddress address=InetAddress.getLocalHost();this.serverAddress=address.getHostAddress();}@Overridepublic void afterPropertiesSet() throws Exception {log.info("begin deploy Netty Server to host {},on port {}",this.serverAddress,this.serverPort);new Thread(()->{try {new NettyServer(this.serverAddress,this.serverPort).startNettyServer();} catch (Exception e) {log.error("start Netty Server Occur Exception,",e);e.printStackTrace();}}).start();}//bean实例化后调用@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if(bean.getClass().isAnnotationPresent(GpRemoteService.class)){ //针对存在该注解的服务进行发布Method[] methods=bean.getClass().getDeclaredMethods();for(Method method: methods){ //保存需要发布的bean的映射String key=bean.getClass().getInterfaces()[0].getName()+"."+method.getName();BeanMethod beanMethod=new BeanMethod();beanMethod.setBean(bean);beanMethod.setMethod(method);Mediator.beanMethodMap.put(key,beanMethod);}}return bean;}}Mediator主要管理bean以及调用
BeanMethod@Datapublic class BeanMethod {private Object bean;private Method method;}Mediator负责持有发布bean的管理 , 以及bean的反射调用
public class Mediator {public static Map<String,BeanMethod> beanMethodMap=new ConcurrentHashMap<>();private volatile static Mediator instance=null;private Mediator(){}public static Mediator getInstance(){if(instance==null){synchronized (Mediator.class){if(instance==null){instance=new Mediator();}}}return instance;}public Object processor(RpcRequest rpcRequest){String key=rpcRequest.getClassName()+"."+rpcRequest.getMethodName();BeanMethod beanMethod=beanMethodMap.get(key);if(beanMethod==null){return null;}Object bean=beanMethod.getBean();Method method=beanMethod.getMethod();try {return method.invoke(bean,rpcRequest.getParams());} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}return null;}}RpcServerProperties定义配置属性
@Data@ConfigurationProperties(prefix = "gp.rpc")public class RpcServerProperties {private int servicePort;}RpcProviderAutoConfiguration定义自动配置类
@Configuration@EnableConfigurationProperties(RpcServerProperties.class)public class RpcProviderAutoConfiguration {@Beanpublic SpringRpcProviderBean rpcProviderBean(RpcServerProperties rpcServerProperties) throws UnknownHostException {return new SpringRpcProviderBean(rpcServerProperties.getServicePort());}}修改RpcServerHandler修改调用方式 , 直接使用Mediator的调用即可 。
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> msg) throws Exception {RpcProtocol resProtocol=new RpcProtocol<>();Header header=msg.getHeader();header.setReqType(ReqType.RESPONSE.code());Object result=Mediator.getInstance().processor(msg.getContent()); //主要修改这个部分resProtocol.setHeader(header);RpcResponse response=new RpcResponse();response.setData(result);response.setMsg("success");resProtocol.setContent(response);ctx.writeAndFlush(resProtocol);}}netty-rpc-provider这个模块中主要修改两个部分
  • application.properties
  • NettyRpcProviderMain
NettyRpcProviderMain@ComponentScan(basePackages = {"com.example.spring.annotation","com.example.spring.service","com.example.service"})@SpringBootApplicationpublic class NettyRpcProviderMain {public static void main(String[] args) throws Exception {SpringApplication.run(NettyRpcProviderMain.class, args);//去掉原来的实例化部分}}application.properties增加一个配置属性 。
gp.rpc.servicePort=20880UserServiceImpl把当前服务发布出去 。
@GpRemoteService //表示将当前服务发布成远程服务@Slf4jpublic class UserServiceImpl implements IUserService {@Overridepublic String saveUser(String name) {log.info("begin saveUser:"+name);return "Save User Success!";}}修改客户端的注解驱动客户端同样也需要通过注解的方式来引用服务 , 这样就能够彻底的屏蔽掉远程通信的细节内容 , 代码结构如图7-2所示
基于儿童优先发展的幼儿园一日生活保育 基于Netty4手把手实现一个带注册中心和注解的Dubbo框架

文章插图
图7-2增加客户端注解在netty-rpc-protocol模块的annotation目录下创建下面这个注解 。
@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.FIELD)@Autowiredpublic @interface GpRemoteReference {}SpringRpcReferenceBean定义工厂Bean , 用来构建远程通信的代理
public class SpringRpcReferenceBean implements FactoryBean<Object> {private Class<?> interfaceClass;private Object object;private String serviceAddress;private int servicePort;@Overridepublic Object getObject() throws Exception {return object;}public void init(){this.object= Proxy.newProxyInstance(this.interfaceClass.getClassLoader(),new Class<?>[]{this.interfaceClass},new RpcInvokerProxy(this.serviceAddress,this.servicePort));}@Overridepublic Class<?> getObjectType() {return this.interfaceClass;}public void setInterfaceClass(Class<?> interfaceClass) {this.interfaceClass = interfaceClass;}public void setServiceAddress(String serviceAddress) {this.serviceAddress = serviceAddress;}public void setServicePort(int servicePort) {this.servicePort = servicePort;}}SpringRpcReferencePostProcessor用来实现远程Bean的动态代理注入:
  • BeanClassLoaderAware: 获取Bean的类装载器
  • BeanFactoryPostProcessor:在spring容器加载了bean的定义文件之后 , 在bean实例化之前执行
  • ApplicationContextAware: 获取上下文对象ApplicationContenxt
【基于儿童优先发展的幼儿园一日生活保育 基于Netty4手把手实现一个带注册中心和注解的Dubbo框架】@Slf4jpublic class SpringRpcReferencePostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {private ApplicationContext context;private ClassLoader classLoader;private RpcClientProperties clientProperties;public SpringRpcReferencePostProcessor(RpcClientProperties clientProperties) {this.clientProperties = clientProperties;}//保存发布的引用bean信息private final Map<String, BeanDefinition> rpcRefBeanDefinitions=new ConcurrentHashMap<>();@Overridepublic void setBeanClassLoader(ClassLoader classLoader) {this.classLoader=classLoader;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.context=applicationContext;}@Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {for (String beanDefinitionname:beanFactory.getBeanDefinitionNames()){//遍历bean定义 , 然后获取到加载的bean , 遍历这些bean中的字段 , 是否携带GpRemoteReference注解//如果有 , 则需要构建一个动态代理实现BeanDefinition beanDefinition=beanFactory.getBeanDefinition(beanDefinitionname);String beanClassName=beanDefinition.getBeanClassName();if(beanClassName!=null){//和forName方法相同 , 内部就是直接调用的forName方法Class<?> clazz=ClassUtils.resolveClassName(beanClassName,this.classLoader);//针对当前类中的指定字段 , 动态创建一个BeanReflectionUtils.doWithFields(clazz,this::parseRpcReference);}}//将@GpRemoteReference注解的bean , 构建一个动态代理对象BeanDefinitionRegistry registry=(BeanDefinitionRegistry)beanFactory;this.rpcRefBeanDefinitions.forEach((beanName,beanDefinition)->{if(context.containsBean(beanName)){log.warn("SpringContext already register bean {}",beanName);return;}//把动态创建的bean注册到容器中registry.registerBeanDefinition(beanName,beanDefinition);log.info("registered RpcReferenceBean {} success.",beanName);});}private void parseRpcReference(Field field){GpRemoteReference gpRemoteReference=AnnotationUtils.getAnnotation(field,GpRemoteReference.class);if(gpRemoteReference!=null) {BeanDefinitionBuilder builder=BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class);builder.setInitMethodName(RpcConstant.INIT_METHOD_NAME);builder.addPropertyValue("interfaceClass",field.getType());builder.addPropertyValue("serviceAddress",clientProperties.getServiceAddress());builder.addPropertyValue("servicePort",clientProperties.getServicePort());BeanDefinition beanDefinition=builder.getBeanDefinition();rpcRefBeanDefinitions.put(field.getName(),beanDefinition);}}}需要在RpcConstant常量中增加一个INIT_METHOD_NAME属性
public class RpcConstant {//header部分的总字节数public final static int HEAD_TOTAL_LEN=16;//魔数public final static short MAGIC=0xca;public static final String INIT_METHOD_NAME = "init";}RpcClientProperties@Datapublic class RpcClientProperties {private String serviceAddress="192.168.1.102";private int servicePort=20880;}RpcRefernceAutoConfiguration@Configurationpublic class RpcRefernceAutoConfiguration implements EnvironmentAware{@Beanpublic SpringRpcReferencePostProcessor postProcessor(){String address=environment.getProperty("gp.serviceAddress");int port=Integer.parseInt(environment.getProperty("gp.servicePort"));RpcClientProperties rc=new RpcClientProperties();rc.setServiceAddress(address);rc.setServicePort(port);return new SpringRpcReferencePostProcessor(rc);}private Environment environment;@Overridepublic void setEnvironment(Environment environment) {this.environment=environment;}}netty-rpc-consumer修改netty-rpc-consumer模块
  • 把该模块变成一个spring boot项目
  • 增加web依赖
  • 添加测试类

基于儿童优先发展的幼儿园一日生活保育 基于Netty4手把手实现一个带注册中心和注解的Dubbo框架

文章插图
图7-3 netty-rpc-consumer模块引入jar包依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>HelloController@RestControllerpublic class HelloController {@GpRemoteReferenceprivate IUserService userService;@GetMapping("/test")public String test(){return userService.saveUser("Mic");}}NettyConsumerMain@ComponentScan(basePackages = {"com.example.spring.annotation","com.example.controller","com.example.spring.reference"})@SpringBootApplicationpublic class NettyConsumerMain {public static void main(String[] args) {SpringApplication.run(NettyConsumerMain.class, args);}}application.propertiesgp.serviceAddress=192.168.1.102servicePort.servicePort=20880访问测试
  • 启动Netty-Rpc-Server
  • 启动Netty-Rpc-Consumer
如果启动过程没有任何问题 , 则可以访问HelloController来测试远程服务的访问 。
引入注册中心创建一个netty-rpc-registry模块 , 代码结构如图7-4所示 。
基于儿童优先发展的幼儿园一日生活保育 基于Netty4手把手实现一个带注册中心和注解的Dubbo框架

文章插图
引入相关依赖
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.2.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.2.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-x-discovery</artifactId><version>4.2.0</version></dependency>IRegistryServicepublic interface IRegistryService {/*** 注册服务* @param serviceInfo* @throws Exception*/void register(ServiceInfo serviceInfo) throws Exception;/*** 取消注册* @param serviceInfo* @throws Exception*/void unRegister(ServiceInfo serviceInfo) throws Exception;/*** 动态发现服务* @param serviceName* @return* @throws Exception*/ServiceInfo discovery(String serviceName) throws Exception;}ServiceInfo@Datapublic class ServiceInfo {private String serviceName;private String serviceAddress;private int servicePort;}ZookeeperRegistryService@Slf4jpublic class ZookeeperRegistryService implements IRegistryService {private static final String REGISTRY_PATH="/registry";//Curator中提供的服务注册与发现的组件封装 , 它对此抽象出了ServiceInstance、// ServiceProvider、ServiceDiscovery三个接口 , 通过它我们可以很轻易的实现Service Discoveryprivate final ServiceDiscovery<ServiceInfo> serviceDiscovery;private ILoadBalance<ServiceInstance<ServiceInfo>> loadBalance;public ZookeeperRegistryService(String registryAddress) throws Exception {CuratorFramework client= CuratorFrameworkFactory.newClient(registryAddress,new ExponentialBackoffRetry(1000,3));JsonInstanceSerializer<ServiceInfo> serializer=new JsonInstanceSerializer<>(ServiceInfo.class);this.serviceDiscovery= ServiceDiscoveryBuilder.builder(ServiceInfo.class).client(client).serializer(serializer).basePath(REGISTRY_PATH).build();this.serviceDiscovery.start();loadBalance=new RandomLoadBalance();}@Overridepublic void register(ServiceInfo serviceInfo) throws Exception {log.info("开始注册服务 , {}",serviceInfo);ServiceInstance<ServiceInfo> serviceInstance=ServiceInstance.<ServiceInfo>builder().name(serviceInfo.getServiceName()).address(serviceInfo.getServiceAddress()).port(serviceInfo.getServicePort()).payload(serviceInfo).build();serviceDiscovery.registerService(serviceInstance);}@Overridepublic void unRegister(ServiceInfo serviceInfo) throws Exception {ServiceInstance<ServiceInfo> serviceInstance=ServiceInstance.<ServiceInfo>builder().name(serviceInfo.getServiceName()).address(serviceInfo.getServiceAddress()).port(serviceInfo.getServicePort()).payload(serviceInfo).build();serviceDiscovery.unregisterService(serviceInstance);}@Overridepublic ServiceInfo discovery(String serviceName) throws Exception {Collection<ServiceInstance<ServiceInfo>> serviceInstances= serviceDiscovery.queryForInstances(serviceName);//通过负载均衡返回某个具体实例ServiceInstance<ServiceInfo> serviceInstance=loadBalance.select((List<ServiceInstance<ServiceInfo>>)serviceInstances);if(serviceInstance!=null){return serviceInstance.getPayload();}return null;}}引入负载均衡算法由于服务端发现服务时可能有多个 , 所以需要用到负载均衡算法来实现
ILoadBalancepublic interface ILoadBalance<T> {T select(List<T> servers);}AbstractLoadBalancepublic abstract class AbstractLoadBanalce implements ILoadBalance<ServiceInstance<ServiceInfo>> {@Overridepublic ServiceInstance<ServiceInfo> select(List<ServiceInstance<ServiceInfo>> servers){if(servers==null||servers.size()==0){return null;}if(servers.size()==1){return servers.get(0);}return doSelect(servers);}protected abstract ServiceInstance<ServiceInfo> doSelect(List<ServiceInstance<ServiceInfo>> servers);}RandomLoadBalancepublic class RandomLoadBalance extends AbstractLoadBanalce {@Overrideprotected ServiceInstance<ServiceInfo> doSelect(List<ServiceInstance<ServiceInfo>> servers) {int length=servers.size();Random random=new Random();return servers.get(random.nextInt(length));}}RegistryTypepublic enum RegistryType {ZOOKEEPER((byte)0),EUREKA((byte)1);private byte code;RegistryType(byte code) {this.code=code;}public byte code(){return this.code;}public static RegistryType findByCode(byte code) {for (RegistryType rt : RegistryType.values()) {if (rt.code() == code) {return rt;}}return null;}}RegistryFactorypublic class RegistryFactory {public static IRegistryService createRegistryService(String address,RegistryType registryType){IRegistryService registryService=null;try {switch (registryType) {case ZOOKEEPER:registryService = new ZookeeperRegistryService(address);break;case EUREKA://TODObreak;default:registryService = new ZookeeperRegistryService(address);break;}}catch (Exception e){e.printStackTrace();}return registryService;}}修改服务端增加服务注册修改netty-rpc-protocol模块 , 加入注册中心的支持
SpringRpcProviderBean按照下面case标注部分 , 表示要修改的内容
@Slf4jpublic class SpringRpcProviderBean implements InitializingBean, BeanPostProcessor {private final int serverPort;private final String serverAddress;private final IRegistryService registryService; //修改部分,增加注册中心实现public SpringRpcProviderBean(int serverPort,IRegistryService registryService) throws UnknownHostException {this.serverPort = serverPort;InetAddress address=InetAddress.getLocalHost();this.serverAddress=address.getHostAddress();this.registryService=registryService; //修改部分,增加注册中心实现}@Overridepublic void afterPropertiesSet() throws Exception {log.info("begin deploy Netty Server to host {},on port {}",this.serverAddress,this.serverPort);new Thread(()->{try {new NettyServer(this.serverAddress,this.serverPort).startNettyServer();} catch (Exception e) {log.error("start Netty Server Occur Exception,",e);e.printStackTrace();}}).start();}@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if(bean.getClass().isAnnotationPresent(GpRemoteService.class)){ //针对存在该注解的服务进行发布Method[] methods=bean.getClass().getDeclaredMethods();for(Method method: methods){String serviceName=bean.getClass().getInterfaces()[0].getName();String key=serviceName+"."+method.getName();BeanMethod beanMethod=new BeanMethod();beanMethod.setBean(bean);beanMethod.setMethod(method);Mediator.beanMethodMap.put(key,beanMethod);try {//修改部分,增加注册中心实现ServiceInfo serviceInfo = new ServiceInfo();serviceInfo.setServiceAddress(this.serverAddress);serviceInfo.setServicePort(this.serverPort);serviceInfo.setServiceName(serviceName);registryService.register(serviceInfo);//修改部分,增加注册中心实现}catch (Exception e){log.error("register service {} faild",serviceName,e);}}}return bean;}}RpcServerProperties修改RpcServerProperties , 增加注册中心的配置
@Data@ConfigurationProperties(prefix = "gp.rpc")public class RpcServerProperties {private int servicePort;private byte registerType;private String registryAddress;}RpcProviderAutoConfiguration增加注册中心的注入 。
@Configuration@EnableConfigurationProperties(RpcServerProperties.class)public class RpcProviderAutoConfiguration {@Beanpublic SpringRpcProviderBean rpcProviderBean(RpcServerProperties rpcServerProperties) throws UnknownHostException {//添加注册中心IRegistryService registryService=RegistryFactory.createRegistryService(rpcServerProperties.getRegistryAddress(), RegistryType.findByCode(rpcServerProperties.getRegisterType()));return new SpringRpcProviderBean(rpcServerProperties.getServicePort(),registryService);}}application.properties修改netty-rpc-provider中的application.properties 。
gp.rpc.servicePort=20880gp.rpc.registerType=0gp.rpc.registryAddress=192.168.221.128:2181修改客户端 , 增加服务发现客户端需要修改的地方较多 , 下面这些修改的代码 , 都是netty-rpc-protocol模块中的类 。
RpcClientProperties增加注册中心类型和注册中心地址的选项
@Datapublic class RpcClientProperties {private String serviceAddress="192.168.1.102";private int servicePort=20880;private byte registryType;private String registryAddress;}修改NettyClient原本是静态地址 , 现在修改成了从注册中心获取地址
@Slf4jpublic class NettyClient {private final Bootstrap bootstrap;private final EventLoopGroup eventLoopGroup=new NioEventLoopGroup();/* private String serviceAddress;private int servicePort;*/public NettyClient(){log.info("begin init NettyClient");bootstrap=new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new RpcClientInitializer());/* this.serviceAddress=serviceAddress;this.servicePort=servicePort;*/}public void sendRequest(RpcProtocol<RpcRequest> protocol, IRegistryService registryService) throws Exception {ServiceInfo serviceInfo=registryService.discovery(protocol.getContent().getClassName());ChannelFuture future=bootstrap.connect(serviceInfo.getServiceAddress(),serviceInfo.getServicePort()).sync();future.addListener(listener->{if(future.isSuccess()){log.info("connect rpc server {} success.",serviceInfo.getServiceAddress());}else{log.error("connect rpc server {} failed .",serviceInfo.getServiceAddress());future.cause().printStackTrace();eventLoopGroup.shutdownGracefully();}});log.info("begin transfer data");future.channel().writeAndFlush(protocol);}}修改RpcInvokerProxy将静态ip和地址 , 修改成IRegistryService
@Slf4jpublic class RpcInvokerProxy implements InvocationHandler {/* private String serviceAddress;private int servicePort;*/IRegistryService registryService;public RpcInvokerProxy(IRegistryService registryService) {/* this.serviceAddress = serviceAddress;this.servicePort = servicePort;*/this.registryService=registryService;}@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {log.info("begin invoke target server");//组装参数RpcProtocol<RpcRequest> protocol=new RpcProtocol<>();long requestId= RequestHolder.REQUEST_ID.incrementAndGet();Header header=new Header(RpcConstant.MAGIC, SerialType.JSON_SERIAL.code(), ReqType.REQUEST.code(),requestId,0);protocol.setHeader(header);RpcRequest request=new RpcRequest();request.setClassName(method.getDeclaringClass().getName());request.setMethodName(method.getName());request.setParameterTypes(method.getParameterTypes());request.setParams(args);protocol.setContent(request);//发送请求NettyClient nettyClient=new NettyClient();//构建异步数据处理RpcFuture<RpcResponse> future=new RpcFuture<>(new DefaultPromise<>(new DefaultEventLoop()));RequestHolder.REQUEST_MAP.put(requestId,future);nettyClient.sendRequest(protocol,this.registryService);return future.getPromise().get().getData();}}SpringRpcReferenceBean修改引用bean , 增加注册中心配置
public class SpringRpcReferenceBean implements FactoryBean<Object> {private Class<?> interfaceClass;private Object object;/* private String serviceAddress;private int servicePort;*///修改增加注册中心private byte registryType;private String registryAddress;@Overridepublic Object getObject() throws Exception {return object;}public void init(){//修改增加注册中心IRegistryService registryService= RegistryFactory.createRegistryService(this.registryAddress, RegistryType.findByCode(this.registryType));this.object= Proxy.newProxyInstance(this.interfaceClass.getClassLoader(),new Class<?>[]{this.interfaceClass},new RpcInvokerProxy(registryService));}@Overridepublic Class<?> getObjectType() {return this.interfaceClass;}public void setInterfaceClass(Class<?> interfaceClass) {this.interfaceClass = interfaceClass;}/* public void setServiceAddress(String serviceAddress) {this.serviceAddress = serviceAddress;}public void setServicePort(int servicePort) {this.servicePort = servicePort;}*/public void setRegistryType(byte registryType) {this.registryType = registryType;}public void setRegistryAddress(String registryAddress) {this.registryAddress = registryAddress;}}SpringRpcReferencePostProcessor@Slf4jpublic class SpringRpcReferencePostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {private ApplicationContext context;private ClassLoader classLoader;private RpcClientProperties clientProperties;public SpringRpcReferencePostProcessor(RpcClientProperties clientProperties) {this.clientProperties = clientProperties;}//保存发布的引用bean信息private final Map<String, BeanDefinition> rpcRefBeanDefinitions=new ConcurrentHashMap<>();@Overridepublic void setBeanClassLoader(ClassLoader classLoader) {this.classLoader=classLoader;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.context=applicationContext;}@Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {for (String beanDefinitionname:beanFactory.getBeanDefinitionNames()){//遍历bean定义 , 然后获取到加载的bean , 遍历这些bean中的字段 , 是否携带GpRemoteReference注解//如果有 , 则需要构建一个动态代理实现BeanDefinition beanDefinition=beanFactory.getBeanDefinition(beanDefinitionname);String beanClassName=beanDefinition.getBeanClassName();if(beanClassName!=null){Class<?> clazz=ClassUtils.resolveClassName(beanClassName,this.classLoader);ReflectionUtils.doWithFields(clazz,this::parseRpcReference);}}//将@GpRemoteReference注解的bean , 构建一个动态代理对象BeanDefinitionRegistry registry=(BeanDefinitionRegistry)beanFactory;this.rpcRefBeanDefinitions.forEach((beanName,beanDefinition)->{if(context.containsBean(beanName)){log.warn("SpringContext already register bean {}",beanName);return;}registry.registerBeanDefinition(beanName,beanDefinition);log.info("registered RpcReferenceBean {} success.",beanName);});}private void parseRpcReference(Field field){GpRemoteReference gpRemoteReference=AnnotationUtils.getAnnotation(field,GpRemoteReference.class);if(gpRemoteReference!=null) {BeanDefinitionBuilder builder=BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class);builder.setInitMethodName(RpcConstant.INIT_METHOD_NAME);builder.addPropertyValue("interfaceClass",field.getType());/*builder.addPropertyValue("serviceAddress",clientProperties.getServiceAddress());builder.addPropertyValue("servicePort",clientProperties.getServicePort());*/builder.addPropertyValue("registryType",clientProperties.getRegistryType());builder.addPropertyValue("registryAddress",clientProperties.getRegistryAddress());BeanDefinition beanDefinition=builder.getBeanDefinition();rpcRefBeanDefinitions.put(field.getName(),beanDefinition);}}}RpcRefernceAutoConfiguration@Configurationpublic class RpcRefernceAutoConfiguration implements EnvironmentAware{@Beanpublic SpringRpcReferencePostProcessor postProcessor(){String address=environment.getProperty("gp.serviceAddress");int port=Integer.parseInt(environment.getProperty("gp.servicePort"));RpcClientProperties rc=new RpcClientProperties();rc.setServiceAddress(address);rc.setServicePort(port);rc.setRegistryType(Byte.parseByte(environment.getProperty("gp.registryType")));rc.setRegistryAddress(environment.getProperty("gp.registryAddress"));return new SpringRpcReferencePostProcessor(rc);}private Environment environment;@Overridepublic void setEnvironment(Environment environment) {this.environment=environment;}}application.properties修改netty-rpc-consumer模块中的配置
gp.serviceAddress=192.168.1.102gp.servicePort=20880gp.registryType=0gp.registryAddress=192.168.221.128:2181负载均衡的测试增加一个服务端的启动类 , 并且修改端口 。然后客户端不需要重启的情况下刷新浏览器 , 即可看到负载均衡的效果 。
基于儿童优先发展的幼儿园一日生活保育 基于Netty4手把手实现一个带注册中心和注解的Dubbo框架

文章插图
图7-5需要源码的同学 , 请关注公众号[跟着Mic学架构] , 回复关键字[rpc] , 即可获得
版权声明:本博客所有文章除特别声明外 , 均采用 CC BY-NC-SA 4.0 许可协议 。转载请注明来自 Mic带你学架构
如果本篇文章对您有帮助 , 还请帮忙点个关注和赞 , 您的坚持是我不断创作的动力 。欢迎关注「跟着Mic学架构」公众号公众号获取更多技术干货!

基于儿童优先发展的幼儿园一日生活保育 基于Netty4手把手实现一个带注册中心和注解的Dubbo框架

文章插图