Spring Cloud Eureka源码分析.pdf

所需积分/C币:22 2019-10-08 11:05:49 1.08MB PDF

为什么要看源码: 1、提升技术功底:学习源码里的优秀设计思想,比如一些疑难问题的解决思路,还有一些优秀的设计模式,整体提 升自己的技术功底 2、深度掌握技术框架:源码看多了,对于一个新技术或框架的掌握速度会有大幅提升,看下框架demo大致就能知 道底层的实现,技术框架更新再快也不怕 3、快速定位线上问题:遇到线上问题,特别是框架源码里的问题(比如bug),能够快速定位,这就是相比其他没看过 源码的人的优势 4、对面试大有裨益:面试一线互联网公司对于框架技术一般都会问到源码级别的实现 5、技术追求:对技术有追求的人必做之事,使用了一个好的框架,很想知道底层是如何实现的
3 return new InstanceRegistry( this, eurekaServerConfig, this, eurekaClientConfig 24 servercodecs, this eurekaclient 25 this. instanceRegistry Properties getExpectedNumberofRenews PerMin(), 6 this. instanceRegistry Properties getDefaultopenForTrafficcount()); 9/配置服务节点信息,这里的作用主要是为了配置 Eureka的peer节点,也就是说当有收到有节点注册上来 30//的时候,需要通知给那些服务节点,(互为一个集群) 81(@Bean 32 @ConditionalOnMissing Bean 03 public Peer EurekaNodes peer EurekaNodes( Peer AwareInstanceRegistry registry, 34 ServerCodecs server Codecs)t 35 return new Peer EurekaNodes (registry, this, eurekaServer Config, 36 this. eurekaClient Config, server Codecs, this application InfoManager ) 37} 38// Eurekaserver的上下文 89 bEan 40 public EurekaServer Context eurekaServerCantext ServerCodecs servercodecs 41 PeerAwareInstance Registry registry, Peer EurekaNodes peerEurekaNodes)[ 42 return new DefaultEurekaServerContext(this eurekaServerConfig, server Codecs 43 registry, peer EurekaNodes, this applicationInfoManager); 45//这个类的作用是 spring-c1oud和原牛 eureka的胶水代码,通过这个类来启动 Eurekasever 46//后面这个类会在 EurekaServer InitializerConfiguration被调用,进行 eureka启动 17 @Bean 48 public EurekaServer Bootstrap eurekaServerBoot strap( Peer AwareInstanceRegistry registry, 49 EurekaServer Context server Context )t return new EurekaServer Bootstrap( this applicationInfoManager, 51 this. eurekaClient Config, this, eurekaServerConfig, registry, 52 serverContext); } 54//置拦截器, Servletcontainer里面实现了 Jersey框架,通过他来实现 eurekaserver对外的 restful11接口 55 bEan 6 public FilterRegistrationBean jersey FilterRegistration( 57 javax. wsrs core Application eurekaJerseyApp)[ 58 FilterRegistrationBean bean new FilterRegistration Bean( 9 bean. setFilter(new Servlet Container (eurekaJerseyApp)); 60 bean setorder(Ordered LOWEST PRECEDENCE) 61 bean. setUrlPatterns 2 Collections. singletonList( EurekaConstants. DEFAULT PREFIX +/*)3 64 return bean Eureka ServerAuto Configurations=EurekaserverlnitializerConfiguration 2*@author Dave Syer 4 aConfiguration @Commons Log 6 public class EurekaServerInitializerconfiguration 7 implements ServletContextAware, SmartLifecycle, Ordered t g QAutowired 10 private EurekaServerConfig eureka ServerConfig; 12 private Servlet context servletcontext 14(@Autowired 15 private ApplicationContext application Context; 17 QAutowired 18 private EurekaServerBootstrap eurekaServer Bootstrap: 20 private boolean running private int order @override 25 public void setservletContext (ServletContext servletContext)t 26 this, servletContext servlet context 29 Override 30 public void start()i 31//启动一个线程 32 new Thread(new Runnable()( 33 Override public void run( 5 tr 36//初始化 Eurekaserver,同时启动 Eureka server 37 eurekaServer Bootstrap contextInitialized( EurekaServerInitializer Configuration this servletConte t) 38 log.info("Started Eureka Server"); 9//发布 Eure kaserver的注册享件 40 publish(new EurekaRegistryAvailableEvent (getEureka ServerConfig()); 41//设置启动的状态为true 42 EurekaServerInitializer Configuration. this running=true; 43//发送 Eureka start事件,其他还有各种事件,我们可以监听这神时间,然后做一些特定的业务需求 44 publish(new EurekaServerStartedEvent(get EurekaServerConfigo)); 45 46 catch (Exception ex)t 47 // Help! 48 log error(" Could not initialize Eureka servlet context, ex); 51 ).start() 51 private EurekaServerconfig getEurekaserverconfigo f E5 return this. eurekaServer Config: 58 private void publish(ApplicationEvent event)( 9 this applicationContext. publishEvent(event); 62 Override 63 public void stop()[ 64 this running false: 65 eurekaServerBootstrap contextDestroyed( this servletContext) 68 Override 69 public boolean isRunning( t 70 return this running @override 74 public int getPhase(t 5 return 0; 78@override 79 public boolean isAutostartup([ 8o return true; @Override 4 public void stop(Runnable callback)t 85 callback run (; 88 Override 89 public int getorder() t co return this, order j EurekaServer Bootstrap的 Icontextlnitializeo初始化方法 1//初始化 Eurekaserver的运行环境和上下文 2 public void contextInitialized( ServletContext context)( 3 try t 4 initEurekaEnvironmento; 5 initEurekaServer o: 7 context. setAttribute( EurekaServer Context class. getNameo, this, serverContext); g catch (Throwable e)f 10 log, error("Cannot bootstrap eureka server :",e)j I1 throw new Runtime Exception("Cannot bootstrap eureka server :",e) 15初始化 Eurekaserver的上下文 16 protected void initEurekaServer Context( throws Exception t 17 //For backward compatibility JsonXStream. getInstance(). registerConverter (new V1AwareInstanceInfoConverter(), 19 XStream PRIORITY VERY HIGH) 20 XmIXStream. getInstance(.register Converter(new V1AwareInstanceInfoConverter(), 21 XStream PRIORITY VERY HIGH); 23 if (isAws(this. applicationInfoManager getInfo))i 24 this. awsBinder new Aws BinderDelegate(this, eurekaserverconfig, 25 this, eurekaclient config, this, registry, this applicationInfoManager); 26 this. awsBinder. startoj 27} 29/初始化 eureka server上下文 30 EurekaServerContextHolder initialize(this. serverContext); 32 log. info("Initialized server context ") 34 // Copy registry from neighboring eureka node 5//从相邻的 eureka节点复制注册表 int registryCount this registry. syncUpo; 37//默认每30秒发送心跳,1分钟就是2次 8//修改 eureka状态为 39//同时,这里面会开启一个定时任务,用于清理6秒没有心跳的客户端。自动下线 this registry. openForTraffic(this, application InfoManag gistry count) 42// Register all monitoring statistics 43 EurekaMonitors registerAllstatso: 6@override 47 public int syncUpo t 18// Copy entire entry from neighboring Ds node t count=0 for (int i=0;((i< serverConfig. getRegistrySyncRetries())&&(count ==0)); i++)i 3 try t 54 Thread. sleep(server Config. getRegistrySyncRetryWaitMs()) 5 catch (InterruptedException e)t 56 logger. warn("Interrupted during registry transfer.") 5:7 break 60 Applications apps eurekaClient getApplications( 61 for (Application app apps getRegisteredApplications())( 2 for(InstanceInfo instance: app. getInstances())i 63 try t 64 if (isRegisterable(instance))t 65//将其他节点的实例注册到本节点 66 register(instance, instance. getLeaseInfo(). getDurationInSecs(), true); 67 count++ 69 catch (Throwable t)[ 70 logger error( During Ds init copy", t); 75 return count; 78@Override public void open ForTraffic(ApplicationInfoManager applicationInfoManager, int count)t 80 // Renewals happen every 30 seconds and for a minute it should be a factor of 2 81//计算每分钟最大续约数 82 this, expectedNumberofRenewsPerMin= count * 2; 83//每分钟最小续约数 84 this, numberofRenews PerMin Threshold 85(int)(this expectedNumberofRenewsPerMin x serverConfig. getRenewalPercentThreshold()) 6 logger. info( " Got count +instances from neighboring Ds node") 87 logger. info(" Renew threshold is:+ number ofRenewsPerMinThreshold) this startupTime System currentTimeMillis); 89 if(count>0)t go this. peerInstancesTransferEmptyonStartup false 92 DataCenter info name self applicationInfoManager. getInfo(. getDataCenterInfo(. getName i 93 boolean isAws Name Amazon = selfName if (isAws & server Config. shouldPrimeAwsReplicaConnectionso)i 95 logger. info("Priming AWs connections for all replicas. ") 96 prime AwSReplicas(applicationInfoManager): :8 logger. info( "Changing status to UP")j 99//没置实例的状态为UP 100 applicationInfoManager setInstanceStatus(Instancestatus UP) 101//开启定时任务,默认6秒执行一次,用于清理68秒之内没有续约的实例 102 super. postInitoi 103 105 protected void postInitot 106 renewsLastMin start(; 107 if(evictionTaskRef get()!= null)t 108 eviction Task Ref get(. cancel(); 110 evictionTaskRef set(new Eviction Task()); 111/服务剔除仟务 112 evictionTimer schedule(evictionTaskRef geto, 113 server Config. getEvictionIntervalTimerInMs(), server Config. getEvictionIntervalTimerInMs()) 115 从上面的 Eureka Configuration类,我们可以看到有个初始化 EurekaServerContext的方 法 I @Bean 2 public EurekaServerContext eurekaServerContext(ServerCodecs servercodecs, 3 PeerAwareInstanceRegistry registry, Peer EurekaNodes peer aNodes)t 4 return new DefaultEurekaServer Context(this eurekaServerConfig, serverCodecs, s registry, peer EurekaNodes, this applicationInfoManager); 6 Defaulteureka Server Context这个类里面的的 initialize方法是被@ PostConstruct这个注解修饰的, 在应用加载的时候,会执行这个方法 I public void initialize throws Exception f logger. info("Initializing ..") 3//启动一个线程,读取其他集群节点的信息,后面后续复制 1 peer EurekaNodes start; 6 registry. init(peer EurekaNodes); 7 logger, info("Initialized")i peer EurekaNodes start(0主要是启动一个只拥有一个线程的线程池,第一次进去会更新一下集群其他节点信息 然后启动了一个定时线程,每60秒更新一次,也就是说后续可以根据配置动态的修改节点配置。(原生的 spring cloud config支持) I public void startot 2 taskExecutor= Executors, newSingleThreadScheduled Executor( 3 new ThreadFactory()t @Override 5 public Thread newThread(Runnable r)t 6 Thread thread new Thread(r,"Eureka-PeerNodes Updater ); thread setDaemon (true) s return thread 2 try t 13//首次进来,更新集群节点信息 14 updatePeerEurekaNodes(resolvePeerUrls(); 5//搞个线程 16 Runnable peersUpdateTask new Runnable()i 17 @Override 18 public void runo t 19 try t 20 updatePeer EurekaNodes(resolvePeerUrls()) I catch(Throwable e)i logger error( "Cannot update the replica Nodes", e) 27 taskExecutor. schedulewithFixedDelay( UpdateTask 29 serverConfig. getPeerEurekaNodesUpdateIntervalMs(, 30 serverConfig. getPeerEurekaNodesUpdateIntervalMs(, 31 TimeUnit, mIlliseconds catch (Exception e)i 31 throw new IllegalstateException(e) 6 for( PeerEurekaNode node peer EurekaNodes 37 logger. info( "Replica node URL:+ node. getserviceUrlO): 40//根据URL构建 PeerEurekanode信息 11 protected PeerEureka Node createPeerEurekaNode( String peerEurekaNodeurl)i HttpreplicationClient replicationclient- Jerseyreplicationclient. createreplicationclient(serve cOnfig, serverCodecs, peerEurekaNodeUrl) String targetHost hostFromUrl(peer EurekaNodeUrl) 44 if(targetHost = null)i 45 targetHost ="host 47 return new Peer EurekaNode (registry, targetHost, peer EurekaNodeUrl, replicationclient, serverCor fig); 4、 Eureka client端源码分析 源码流程图参考:< eureka客户端源码分析 client初始化 @Inject 2 Discoveryclient(ApplicationInfoManager applicationInfoManager, Eurekaclientconfig config, Abstra ctDiscoveryClientoptionalArgs args 3 Provider<BackupRegistry> backupRegistryProvider )( 4//省略非关键代码。。。 6 logger. info("Initializing Eureka in region 0", clientConfig getRegion(); 8//省略非关键代码。 10 try t I1//de fault size of 2- 1 each for heartbeat and cacheRefresh 12 scheduler= Executors. newScheduledThreadPool(2 13 new Thread Factory Builder( 14 .setName Format("DiscoveryClient-%d") setDaemon(true) 16.build)) 18 heartbeatExecutor= new ThreadPoolExecutor( 19 1, clientConfig getHeartbeatExecutor ThreadPoolsizeo, 0, TimeUnit SECONDS, 0 new SynchronousQueue< Runnable>() 21 new ThreadFactory( 2. setName Format ("Discoveryclient-HeartbeatExecutor-%d") setDaemon(true) 24.bui1d() 25 );/ use direct handoff cacherefresh Executor new Thread Poolexecutor 28 1, client Config. get Cache RefreshExecutorThreadPoolsize(, 0, TimeUnit SECONDS, 29 new SynchronousQueue< Runnable>(), 30 new Thread Factory Builder( setName Format("DiscoveryClient-CacheRefreshExecutor-%") setDaemon (true) 33.build( 34 );//use direct handoff 86 eurekaTransport new EurekaTransport() 37 scheduleServerEndpoint Task(eurekaTransport, args 39 AZTORegionMapper azToRegionMapper, 40 if (clientConfig shouldUseDns ForFetchingServiceurlso))i 41 azToRegionMapper new DNS BasedAzToRegionMapper(client Config); 43 azToRegion Mapper new Property BasedAz ToRegionMapper(clientConfig): 45 if(null I= remoteRegionsToFetchget()( 46 azToRegionMapper setRegions ToFetch(remoteRegionsToFetch get().split(","))i 48 instanceRegionchecker new Instance Regionchecker(azToRegionMapper, client Config. getRegiono) 3 catch(Throwable e)t 50 throw new Runtime Exception("Failed to initialize Discoveryclient!",e); 53 if (client Config. shouldFetchRegistry()&&! fetchRegistry(false))[ 4 fetchRegistryF romBackupo)i 57// call and execute the pre registration handler before all background tasks (inc registration) is started 8 if(this. preRegistrationHandler I= null)[ 59 this. preRegistrationHandler beforeRegistration () 62 if (client Config. shouldRegisterwithEureka()&& client Config. shouldEnforceRegistrationAtInito)) try i 64 if (!register())t 65 throw new IllegalstateException("Registration error at startup, Invalid server response. 67 catch (Throwable th)i 68 logger error ("Registration error at startup: [" th getMessage()); 69 throw new IllegalStateException(th); 73/最核心代码

...展开详情
img
皮皮今

关注 私信 TA的资源

上传资源赚积分,得勋章
最新资源