SpringCloud
笔记;伪代码
# 版本选择
https://spring.io/projects/spring-cloud (opens new window)
SpringCloud版本号命名为英国伦敦地铁站
Release Train | Boot Version |
---|---|
2021.0.x (opens new window) | |
aka Jubilee | 2.6.x |
2020.0.x (opens new window) | |
aka Ilford | 2.4.x, 2.5.x (Starting with 2020.0.3) |
Hoxton (opens new window) | 2.2.x, 2.3.x (Starting with SR5) |
Greenwich (opens new window) | 2.1.x |
Finchley (opens new window) | 2.0.x |
Edgware (opens new window) | 1.5.x |
Dalston (opens new window) | 1.5.x |
更详细版本对比
https://start.spring.io/actuator/info (opens new window)
Spring Cloud Alibaba版本说明
https://github.com/alibaba/spring-cloud-alibaba/wiki/版本说明 (opens new window)
# 组件版本关系
Spring Cloud Alibaba Version | Sentinel Version | Nacos Version | RocketMQ Version | Dubbo Version | Seata Version |
---|---|---|---|---|---|
2.2.7.RELEASE* | 1.8.1 | 2.0.3 | 4.6.1 | 2.7.13 | 1.3.0 |
2.2.6.RELEASE | 1.8.1 | 1.4.2 | 4.4.0 | 2.7.8 | 1.3.0 |
2021.1 or 2.2.5.RELEASE or 2.1.4.RELEASE or 2.0.4.RELEASE | 1.8.0 | 1.4.1 | 4.4.0 | 2.7.8 | 1.3.0 |
2.2.3.RELEASE or 2.1.3.RELEASE or 2.0.3.RELEASE | 1.8.0 | 1.3.3 | 4.4.0 | 2.7.8 | 1.3.0 |
2.2.1.RELEASE or 2.1.2.RELEASE or 2.0.2.RELEASE | 1.7.1 | 1.2.1 | 4.4.0 | 2.7.6 | 1.2.0 |
2.2.0.RELEASE | 1.7.1 | 1.1.4 | 4.4.0 | 2.7.4.1 | 1.0.0 |
2.1.1.RELEASE or 2.0.1.RELEASE or 1.5.1.RELEASE | 1.7.0 | 1.1.4 | 4.4.0 | 2.7.3 | 0.9.0 |
2.1.0.RELEASE or 2.0.0.RELEASE or 1.5.0.RELEASE | 1.6.3 | 1.1.1 | 4.4.0 | 2.7.3 | 0.7.1 |
# 毕业版本依赖关系(推荐使用)
Spring Cloud Alibaba Version | Spring Cloud Version | Spring Boot Version |
---|---|---|
2.2.7.RELEASE | Spring Cloud Hoxton.SR12 | 2.3.12.RELEASE |
2021.1 | Spring Cloud 2020.0.1 | 2.4.2 |
2.2.6.RELEASE | Spring Cloud Hoxton.SR9 | 2.3.2.RELEASE |
2.1.4.RELEASE | Spring Cloud Greenwich.SR6 | 2.1.13.RELEASE |
2.2.1.RELEASE | Spring Cloud Hoxton.SR3 | 2.2.5.RELEASE |
2.2.0.RELEASE | Spring Cloud Hoxton.RELEASE | 2.2.X.RELEASE |
2.1.2.RELEASE | Spring Cloud Greenwich | 2.1.X.RELEASE |
2.0.4.RELEASE(停止维护,建议升级) | Spring Cloud Finchley | 2.0.X.RELEASE |
1.5.1.RELEASE(停止维护,建议升级) | Spring Cloud Edgware | 1.5.X.RELEASE |
本次选用版本
Spring Cloud Alibaba Version | SpringCloud | Spring Boot Version | Nacos Version | JDK |
---|---|---|---|---|
2.2.5.RELEASE | Spring Cloud Hoxton.SR8 | 2.3.2.RELEASE | 1.4.1 | 1.8 |
# Eureka服务注册与发现
Eureka包含两个组件:Eureka Server和Eureka Client
- Eureka Server提供服务注册服务 各个微服务节点通过配置启动后,会在EurekaServer中进行注册,这样EurekaServer中的服务注册表中将会存储所有可用服务节点的信息,服务节点的信息可以在界面中直观看到。
- EurekaClient通过注册中心进行访问 是一个Java客户端,用于简化Eureka Server的交互,客户端同时也具备一个内置的、使用轮询(round-robin)负载算法的负载均衡器。在应用启动后,将会向Eureka Server发送心跳(默认周期为30秒)。如果Eureka Server在多个心跳周期内没有接收到某个节点的心跳,EurekaServer将会从服务注册表中把这个服务节点移除(默认90秒)
# 单机Eureka构建步骤
# 注册中心
导入eureka-server依赖
<!--eureka-server-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
编写配置文件
server:
port: 7001
eureka:
instance:
hostname: localhost #eureka服务端的实例名称
client:
#false表示不向注册中心注册自己。
register-with-eureka: false
#false表示自己端就是注册中心,我的职责就是维护服务实例,并不需要去检索服务
fetch-registry: false
service-url:
#设置与Eureka Server交互的地址查询服务和注册服务都需要依赖这个地址。
defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/
添加注解@EnableEurekaServer
@SpringBootApplication
@EnableEurekaServer
public class EurekaMain7001
{
public static void main(String[] args)
{
SpringApplication.run(EurekaMain7001.class,args);
}
}
浏览器查看http://localhost:7001/
# 服务提供者
导入eureka-client依赖
<!--eureka-client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
编写配置
eureka:
client:
#表示是否将自己注册进EurekaServer默认为true。
register-with-eureka: true
#是否从EurekaServer抓取已有的注册信息,默认为true。单节点无所谓,集群必须设置为true才能配合ribbon使用负载均衡
fetchRegistry: true
service-url:
defaultZone: http://localhost:7001/eureka
添加注解@EnableEurekaClient
@SpringBootApplication
@EnableEurekaClient
public class PaymentMain8001 {
public static void main(String[] args) {
SpringApplication.run(PaymentMain8001.class);
}
}
# 集群Eureka构建步骤
修改hosts文件,方便测试。C:\Windows\System32\drivers\etc
127.0.0.1 eureka7001.com
127.0.0.1 eureka7002.com
建立两个eureka-server,不同端口,相互注册
7001服务注册到7002
server:
port: 7001
eureka:
instance:
hostname: eureka7001.com #eureka服务端的实例名称
client:
register-with-eureka: false #false表示不向注册中心注册自己。
fetch-registry: false #false表示自己端就是注册中心,我的职责就是维护服务实例,并不需要去检索服务
service-url:
defaultZone: http://eureka7002.com:7002/eureka/
7002服务注册到7001
server:
port: 7002
eureka:
instance:
hostname: eureka7002.com #eureka服务端的实例名称
client:
register-with-eureka: false #false表示不向注册中心注册自己。
fetch-registry: false #false表示自己端就是注册中心,我的职责就是维护服务实例,并不需要去检索服务
service-url:
defaultZone: http://eureka7001.com:7001/eureka/
http://eureka7002.com:7002/和http://eureka7001.com:7001/都可以进行访问
将服务提供端注册到eureka集群
server:
# 另外一台端口
# port: 8001
port: 8002
eureka:
client:
#表示是否将自己注册进EurekaServer默认为true。
register-with-eureka: true
#是否从EurekaServer抓取已有的注册信息,默认为true。单节点无所谓,集群必须设置为true才能配合ribbon使用负载均衡
fetchRegistry: true
service-url:
defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka
将服务消费端注册到eureka集群
server:
port: 80
spring:
application:
name: cloud-order-service
eureka:
client:
#表示是否将自己注册进EurekaServer默认为true。
register-with-eureka: true
#是否从EurekaServer抓取已有的注册信息,默认为true。单节点无所谓,集群必须设置为true才能配合ribbon使用负载均衡
fetchRegistry: true
service-url:
defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka
由于有多个服务提供者的实例,远程调用地址不能写死,所以这时候就要用服务名称代替ip+端口
// private static final String PAYMENT_URL = "http://localhost:8001";
private static final String PAYMENT_URL = "http://CLOUD-PAYMENT-SERVICE";
虽然用服务名称了,但是有多个实例,不知道使用哪个实例,所以就要指定使用策略,可以使用(客户端)负载均衡
@Bean
// 负载均衡
@LoadBalanced
public RestTemplate restTemplate() {
return new RestTemplate();
}
# actuator微服务信息完善
eureka:
instance:
# 主机名称:服务名称修改
instance-id: payment8001
# 访问信息有IP信息提示
prefer-ip-address: true
# 服务发现Discovery
对于注册进eureka里面的服务,可以通过服务发现来获得该服务的信息
@Resource
private DiscoveryClient discoveryClient;
@GetMapping("/payment/discovery")
public Object discovery() {
List<String> services = discoveryClient.getServices();
// 获取所有服务
for (String service : services) {
log.info(service);
}
// 根据服务名,获取此服务的所有实例
List<ServiceInstance> instances = discoveryClient.getInstances("CLOUD-PAYMENT-SERVICE");
for (ServiceInstance instance : instances) {
log.info("{}|{}|{}|{}|{}",instance.getServiceId(),instance.getInstanceId(),instance.getHost(),instance.getPort(),instance.getUri());
}
return this.discoveryClient;
}
添加注解
@SpringBootApplication
@EnableEurekaClient
// 启用服务发现
@EnableDiscoveryClient
public class PaymentMain8001 {
public static void main(String[] args) {
SpringApplication.run(PaymentMain8001.class);
}
}
# Eureka自我保护
为什么会产生Eureka自我保护机制?
- 为了防止EurekaClient可以正常运行,但是 与 EurekaServer网络不通情况下,EurekaServer不会立刻将EurekaClient服务剔除
什么是自我保护模式?
- 默认情况下,如果EurekaServer在一定时间内没有接收到某个微服务实例的心跳,EurekaServer将会注销该实例(默认90秒)。但是当网络分区故障发生(延时、卡顿、拥挤)时,微服务与EurekaServer之间无法正常通信,以上行为可能变得非常危险了——因为微服务本身其实是健康的,此时本不应该注销这个微服务。Eureka通过“自我保护模式”来解决这个问题——当EurekaServer节点在短时间内丢失过多客户端时(可能发生了网络分区故障),那么这个节点就会进入自我保护模式。
如何关闭自我保护?
- 注册中心
eureka:
server:
#关闭自我保护机制,保证不可用服务被及时踢除
enable-self-preservation: false
# 清理无效节点的时间间隔
eviction-interval-timer-in-ms: 2000
- 服务端
eureka:
instance:
#Eureka客户端向服务端发送心跳的时间间隔,单位为秒(默认是30秒)
lease-renewal-interval-in-seconds: 3
#Eureka服务端在收到最后一次心跳后等待时间上限,单位为秒(默认是90秒),超时将剔除服务
lease-expiration-duration-in-seconds: 5
# Zookeeper服务注册与发现
# 环境搭建
安装docker pull zookeeper
启动docker run --name myZookeeper --restart always -e JVMFLAGS="-Xmx1024m" -p 2181:2181 zookeeper
查看
- 进入容器
docker exec -it myZookeeper /bin/bash
- 连接客户端
zkClinet.sh
# 服务提供者
新建项目导入依赖
<!-- SpringBoot整合zookeeper客户端 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
</dependency>
编写配置
#8004表示注册到zookeeper服务器的服务提供者端口号
server:
port: 8004
#服务别名----注册zookeeper到注册中心名称
spring:
application:
name: cloud-provider-payment
cloud:
zookeeper:
connect-string: 192.168.83.128:2181
启动类
@SpringBootApplication
@EnableDiscoveryClient //该注解用于向使用consul或者zookeeper作为注册中心时注册服务
public class PaymentMain8004
{
public static void main(String[] args)
{
SpringApplication.run(PaymentMain8004.class,args);
}
}
controller
@RestController
public class PaymentController {
@Value("${server.port}")
private String serverPort;
@RequestMapping(value = "/payment/zk")
public String paymentzk() {
return "springcloud with zookeeper: " + serverPort + "\t" + UUID.randomUUID().toString();
}
}
启动测试,如果报错可能是版本冲突
<!-- SpringBoot整合zookeeper客户端 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
<!--先排除自带的zookeeper3.5.3-->
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--添加zookeeper3.4.9版本-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency>
查看zookeeper
[zk: localhost:2181(CONNECTED) 3] ls /
[services, zookeeper]
[zk: localhost:2181(CONNECTED) 5] ls /services
[cloud-provider-payment]
[zk: localhost:2181(CONNECTED) 6] ls /services/cloud-provider-payment
[b3c0c540-5727-4e15-98d9-d86bf7172d0f]
[zk: localhost:2181(CONNECTED) 7] ls /services/cloud-provider-payment/b3c0c540-5727-4e15-98d9-d86bf7172d0f
[]
[zk: localhost:2181(CONNECTED) 8] get /services/cloud-provider-payment/b3c0c540-5727-4e15-98d9-d86bf7172d0f
{"name":"cloud-provider-payment","id":"b3c0c540-5727-4e15-98d9-d86bf7172d0f","address":"localhost","port":8004,"sslPort":null,"payload":{"@class":"org.springframework.cloud.zookeeper.discovery.ZookeeperInstance","id":"application-1","name":"cloud-provider-payment","metadata":{}},"registrationTimeUTC":1642591350845,"serviceType":"DYNAMIC","uriSpec":{"parts":[{"value":"scheme","variable":true},{"value":"://","variable":false},{"value":"address","variable":true},{"value":":","variable":false},{"value":"port","variable":true}]}}
cZxid = 0xc
ctime = Wed Jan 19 11:22:32 GMT 2022
mZxid = 0xc
mtime = Wed Jan 19 11:22:32 GMT 2022
pZxid = 0xc
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x17e71fddd480003
dataLength = 530
numChildren = 0
# 服务消费者
依赖
<!-- SpringBoot整合zookeeper客户端 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
<!--先排除自带的zookeeper-->
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--添加zookeeper3.4.9版本-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency>
配置
server:
port: 80
spring:
application:
name: cloud-consumer-order
cloud:
#注册到zookeeper地址
zookeeper:
connect-string: 192.168.83.128:2181
启动类
@SpringBootApplication
@EnableDiscoveryClient
public class OrderZKMain80
{
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return new RestTemplate();
}
public static void main(String[] args)
{
SpringApplication.run(OrderZKMain80.class,args);
}
}
controller
@RestController
public class OrderZKController
{
public static final String INVOKE_URL = "http://cloud-provider-payment";
@Autowired
private RestTemplate restTemplate;
@RequestMapping(value = "/consumer/payment/zk")
public String paymentInfo()
{
String result = restTemplate.getForObject(INVOKE_URL+"/payment/zk", String.class);
System.out.println("消费者调用支付服务(zookeeper)--->result:" + result);
return result;
}
}
# Consul服务注册与发现
# 环境搭建
下载地址https://www.consul.io/downloads.html
exe的目录下,执行./consul agent -dev
命令,使用开发模式启动,通过浏览器查看http://localhost:8500
# 服务提供者
依赖
<!--SpringCloud consul-server -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
</dependency>
配置
###consul服务端口号
server:
port: 8006
spring:
application:
name: consul-provider-payment
####consul注册中心地址
cloud:
consul:
host: localhost
port: 8500
discovery:
#hostname: 127.0.0.1
service-name: ${spring.application.name}
启动类
@SpringBootApplication
@EnableDiscoveryClient
public class PaymentMain8006
{
public static void main(String[] args)
{
SpringApplication.run(PaymentMain8006.class,args);
}
}
controller
@RestController
public class PaymentController
{
@Value("${server.port}")
private String serverPort;
@GetMapping("/payment/consul")
public String paymentInfo()
{
return "springcloud with consul: "+serverPort+"\t\t"+ UUID.randomUUID().toString();
}
}
# 服务消费者
依赖
<!--SpringCloud consul-server -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
</dependency>
配置
###consul服务端口号
server:
port: 80
spring:
application:
name: cloud-consumer-order
####consul注册中心地址
cloud:
consul:
host: localhost
port: 8500
discovery:
#hostname: 127.0.0.1
service-name: ${spring.application.name}
启动类
@SpringBootApplication
@EnableDiscoveryClient //该注解用于向使用consul或者zookeeper作为注册中心时注册服务
public class OrderConsulMain80
{
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return new RestTemplate();
}
public static void main(String[] args)
{
SpringApplication.run(OrderConsulMain80.class,args);
}
}
controller
@RestController
public class OrderConsulController
{
public static final String INVOKE_URL = "http://cloud-provider-payment"; //consul-provider-payment
@Autowired
private RestTemplate restTemplate;
@GetMapping(value = "/consumer/payment/consul")
public String paymentInfo()
{
String result = restTemplate.getForObject(INVOKE_URL+"/payment/consul", String.class);
System.out.println("消费者调用支付服务(consule)--->result:" + result);
return result;
}
}
# Ribbon负载均衡
# 概述
LB负载均衡(Load Balance)是什么
- 简单的说就是将用户的请求平摊的分配到多个服务上,从而达到系统的HA(高可用)。 常见的负载均衡有软件Nginx,LVS,硬件 F5等。
Ribbon本地负载均衡客户端 VS Nginx服务端负载均衡 区别
- Nginx是服务器负载均衡,客户端所有请求都会交给nginx,然后由nginx实现转发请求。即负载均衡是由服务端实现的。
- Ribbon本地负载均衡,在调用微服务接口时候,会在注册中心上获取注册信息服务列表之后缓存到JVM本地,从而在本地实现RPC远程服务调用技术。
集中式负载均衡
- 即在服务的消费方和提供方之间使用独立的LB设施(可以是硬件,如F5, 也可以是软件,如nginx), 由该设施负责把访问请求通过某种策略转发至服务的提供方;
进程内负载均衡
- 将LB逻辑集成到消费方,消费方从服务注册中心获知有哪些地址可用,然后自己再从这些地址中选择出一个合适的服务器。 Ribbon就属于进程内LB,它只是一个类库,集成于消费方进程,消费方通过它来获取到服务提供方的地址。
总结:负载均衡+RestTemplate调用,Ribbon其实就是一个软负载均衡的客户端组件,他可以和其他所需请求的客户端结合使用,和eureka结合只是其中的一个实例。
# 核心组件
常用策略
- RoundRobinRule:轮询
- RandomRule:随机
- RetryRule:先按照RoundRobinRule的策略获取服务,如果获取服务失败则在指定时间内会进行重试,获取可用的服务
- WeightedResponseTimeRule:对RoundRobinRule的扩展,响应速度越快的实例选择权重越大,越容易被选择
- BestAvailableRule:会先过滤掉由于多次访问故障而处于断路器跳闸状态的服务,然后选择一个并发量最小的服务
- AvailabilityFilteringRule:先过滤掉故障实例,再选择并发较小的实例
- ZoneAvoidanceRule:默认规则,复合判断server所在区域的性能和server的可用性选择服务器
# 修改默认策略
https://www.springcloud.cc/spring-cloud-netflix.html (opens new window)
您可以使用<client>.ribbon.*
中的外部属性来配置Ribbon客户端的某些位,这与使用Netflix API本身没有什么不同,只能使用Spring Boot配置文件。本机选项可以在CommonClientConfigKey
(功能区内核心部分)中作为静态字段进行检查。
Spring Cloud还允许您通过使用@RibbonClient
声明其他配置(位于RibbonClientConfiguration
之上)来完全控制客户端。例:
@Configuration
@RibbonClient(name = "foo", configuration = FooConfiguration.class)
public class TestConfiguration {
}
在这种情况下,客户端由RibbonClientConfiguration
中已经存在的组件与FooConfiguration
中的任何组件组成(后者通常会覆盖前者)。
| 警告 | FooConfiguration
必须是@Configuration
,但请注意,主应用程序上下文不属于@ComponentScan
,否则将由@RibbonClients
共享。如果您使用@ComponentScan
(或@SpringBootApplication
),则需要采取措施避免包含(例如将其放在一个单独的,不重叠的包中,或者指定要在@ComponentScan
)。 |
---|
总结 |
否则我们自定义的这个配置类就会被所有的Ribbon客户端所共享,达不到特殊化定制的目的了。 |
新建一个自定义配置类,和启动类不在同一个包下
package com.starry.myrule;
@Configuration
public class MyselfRule {
@Bean
public IRule myRule() {
return new RandomRule();
}
}
启动类添加注解@RibbonClient
package com.starry.springcloud;
@SpringBootApplication
@EnableEurekaClient
// 对CLOUD-PAYMENT-SERVICE服务,使用自定义的负载均衡策略
@RibbonClient(name = "CLOUD-PAYMENT-SERVICE",configuration = MyselfRule.class)
public class OrderMain80 {
public static void main(String[] args) {
SpringApplication.run(OrderMain80.class);
}
}
# 自定义策略
# 原理
负载均衡算法:rest接口第几次请求数 % 服务器集群总数量 = 实际调用服务器位置下标 ,每次服务重启动后rest接口计数从1开始。
List<ServiceInstance> instances = discoveryClient.getInstances("CLOUD-PAYMENT-SERVICE");
如: List [0] instances = 127.0.0.1:8002 List [1] instances = 127.0.0.1:8001
8001+ 8002 组合成为集群,它们共计2台机器,集群总数为2, 按照轮询算法原理:
当总请求数为1时: 1 % 2 =1 对应下标位置为1 ,则获得服务地址为127.0.0.1:8001 当总请求数位2时: 2 % 2 =0 对应下标位置为0 ,则获得服务地址为127.0.0.1:8002 当总请求数位3时: 3 % 2 =1 对应下标位置为1 ,则获得服务地址为127.0.0.1:8001 当总请求数位4时: 4 % 2 =0 对应下标位置为0 ,则获得服务地址为127.0.0.1:8002 如此类推......
# 源码
RoundRobinRule(轮询)源码
choose方法返回一个服务实例,RoundRobinRule继承AbstractLoadBalancerRule,但是AbstractLoadBalancerRule没有choose方法,所以就是实现IRule的接口。
public class RoundRobinRule extends AbstractLoadBalancerRule {
// 原子类Integer
private AtomicInteger nextServerCyclicCounter;
// 只要可用的服务(没有用到)
private static final boolean AVAILABLE_ONLY_SERVERS = true;
// 所有服务(没有用到)
private static final boolean ALL_SERVERS = false;
private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);
// 实例化原子类Integer
public RoundRobinRule() {
nextServerCyclicCounter = new AtomicInteger(0);
}
public RoundRobinRule(ILoadBalancer lb) {
this();
setLoadBalancer(lb);
}
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
int count = 0;
while (server == null && count++ < 10) {
// 获取可用的服务(实例)
List<Server> reachableServers = lb.getReachableServers();
// 所有服务(服务可达和不可达)
List<Server> allServers = lb.getAllServers();
// 可用服务数量
int upCount = reachableServers.size();
// 总服务数量,根据这个取余(高可用,AP)
int serverCount = allServers.size();
// 服务不可达
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
// 请求次数累加,并返回服务索引
int nextServerIndex = incrementAndGetModulo(serverCount);
// 在list中通过索引获取服务
server = allServers.get(nextServerIndex);
// 如果服务为空,可能网络延迟等原因,先让出cpu,重试(下次循环)
if (server == null) {
Thread.yield();
continue;
}
// 服务是活的就返回这个服务
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
// Next.
server = null;
}
// 如果循环10次都没有成功,就打印日志
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
// CAS+自旋锁
private int incrementAndGetModulo(int modulo) {
for (;;) {
// 获取当前值
int current = nextServerCyclicCounter.get();
// 当前值+1,对实例总数取余,得到索引
int next = (current + 1) % modulo;
// CAS更新数据,如果更新成功就返回索引,失败就自旋(循环,重试)
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}
@Override
public Server choose(Object key) {
return choose(getLoadBalancer(), key);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
// 初始化配置,什么都没做
}
}
# 手写一个
继承AbstractLoadBalancerRule,实现choose接口,添加到spring容器
@Component
public class MyRoundRule extends AbstractLoadBalancerRule {
@Override
public void initWithNiwsConfig(IClientConfig iClientConfig) {
}
// 计数器
private AtomicInteger number;
// 是否只统计可用实例
private boolean availableOnlyServers = true;
public boolean isAvailableOnlyServers() {
return availableOnlyServers;
}
public void setAvailableOnlyServers(boolean availableOnlyServers) {
this.availableOnlyServers = availableOnlyServers;
}
public MyRoundRule() {
number = new AtomicInteger(0);
}
public MyRoundRule(boolean availableOnlyServers) {
this();
this.availableOnlyServers = availableOnlyServers;
}
public MyRoundRule(ILoadBalancer lb) {
this();
setLoadBalancer(lb);
}
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Server server = null;
int cycleNumber = 0;
// 最大循环次数 只显示可用就循环3次,否则就循环10次
int cycleMaxNumber = availableOnlyServers ? 3 : 10;
while (server == null && cycleNumber++ < cycleMaxNumber) {
List<Server> serverList;
if (availableOnlyServers) {
serverList = lb.getReachableServers();
} else {
serverList = lb.getAllServers();
}
int serverCount = serverList.size();
if (serverCount == 0) {
return null;
}
int nextServerIndex = incrementAndGetModulo(serverCount);
server = serverList.get(nextServerIndex);
if (server == null) {
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
server = null;
}
return server;
}
private int incrementAndGetModulo(int serverCount) {
int current;
int index;
do {
current = number.get();
index = (number.get() + 1) % serverCount;
} while (!number.compareAndSet(current, index));
return index;
}
}
修改配置
@Configuration
public class MyselfRule {
@Bean
public IRule myRule() {
// return new RandomRule();
return new MyRoundRule(true);
}
}
启动类添加注解
@SpringBootApplication
@EnableEurekaClient
@RibbonClient(name = "CLOUD-PAYMENT-SERVICE",configuration = MyselfRule.class)
public class OrderMain80 {
public static void main(String[] args) {
SpringApplication.run(OrderMain80.class);
}
}
# OpenFeign服务接口调用
# 概述
Feign是一个声明式的Web服务客户端,让编写Web服务客户端变得非常容易,只需创建一个接口并在接口上添加注解即可
Feign能干什么
- Feign旨在使编写Java Http客户端变得更容易。 前面在使用Ribbon+RestTemplate时,利用RestTemplate对http请求的封装处理,形成了一套模版化的调用方法。但是在实际开发中,由于对服务依赖的调用可能不止一处,往往一个接口会被多处调用,所以通常都会针对每个微服务自行封装一些客户端类来包装这些依赖服务的调用。所以,Feign在此基础上做了进一步封装,由他来帮助我们定义和实现依赖服务接口的定义。在Feign的实现下,我们只需创建一个接口并使用注解的方式来配置它(以前是Dao接口上面标注Mapper注解,现在是一个微服务接口上面标注一个Feign注解即可),即可完成对服务提供方的接口绑定,简化了使用Spring cloud Ribbon时,自动封装服务调用客户端的开发量。
Feign集成了Ribbon 利用Ribbon维护了服务列表信息,并且通过轮询实现了客户端的负载均衡。而与Ribbon不同的是,通过feign只需要定义服务绑定接口且以声明式的方法,优雅而简单的实现了服务调用
Feign与OpenFeign区别
- Feign是Spring Cloud组件中的一个轻量级RESTful的HTTP服务客户端 Feign内置了Ribbon,用来做客户端负载均衡,去调用服务注册中心的服务。Feign的使用方式是:使用Feign的注解定义接口,调用这个接口,就可以调用服务注册中心的服务
- OpenFeign是Spring Cloud 在Feign的基础上支持了SpringMVC的注解,如@RequesMapping等等。OpenFeign的@FeignClient可以解析SpringMVC的@RequestMapping注解下的接口,并通过动态代理的方式产生实现类,实现类中做负载均衡并调用其他服务。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-feign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
# 服务端使用
依赖
<!--openfeign-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<!--eureka client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
配置
server:
port: 80
eureka:
client:
register-with-eureka: false
service-url:
defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/
启动类添加注解@EnableFeignClients
@SpringBootApplication
@EnableFeignClients
public class OrderFeignMain80
{
public static void main(String[] args)
{
SpringApplication.run(OrderFeignMain80.class,args);
}
}
service远程调用
@Component
// 要调用的服务名称
@FeignClient("CLOUD-PAYMENT-SERVICE")
public interface PaymentFeignService {
// 直接把服务提供方的controller复制
@GetMapping(value = "/payment/get/{id}")
CommonResult<Payment> getPaymentById(@PathVariable("id") Long id);
}
controller
@RestController
public class OrderFeignController
{
@Resource
private PaymentFeignService paymentFeignService;
@GetMapping(value = "/consumer/payment/get/{id}")
public CommonResult<Payment> getPaymentById(@PathVariable("id") Long id)
{
return paymentFeignService.getPaymentById(id);
}
}
测试http://localhost/consumer/payment/get/1自带负载均衡
# 修改超时配置
默认超时时间1s
org/springframework/cloud/netflix/ribbon/RibbonClientConfiguration.java
/**
* Ribbon client default connect timeout.
*/
public static final int DEFAULT_CONNECT_TIMEOUT = 1000;
/**
* Ribbon client default read timeout.
*/
public static final int DEFAULT_READ_TIMEOUT = 1000;
修改消费端配置
#设置feign客户端超时时间(OpenFeign默认支持ribbon)
ribbon:
#指的是建立连接所用的时间,适用于网络状况正常的情况下,两端连接所用的时间
ReadTimeout: 5000
#指的是建立连接后从服务器读取到可用资源所用的时间response
ConnectTimeout: 5000
# 日志打印
Feign 提供了日志打印功能,我们可以通过配置来调整日志级别,从而了解 Feign 中 Http 请求的细节。 说白了就是对Feign接口的调用情况进行监控和输出
日志级别
- NONE:默认的,不显示任何日志;
- BASIC:仅记录请求方法、URL、响应状态码及执行时间;
- HEADERS:除了 BASIC 中定义的信息之外,还有请求和响应的头信息;
- FULL:除了 HEADERS 中定义的信息之外,还有请求和响应的正文及元数据。
配置日志bean
@Configuration
public class FeignConfig {
@Bean
Logger.Level feignLoggerLevel() {
return Logger.Level.FULL;
}
}
yaml配置日志级别
logging:
level:
com.starry.springcloud.service.PaymentFeignService: debug
控制台输出
2022-01-22 21:46:25.535 DEBUG 13100 --- [p-nio-80-exec-2] c.s.s.service.PaymentFeignService : [PaymentFeignService#getPaymentById] ---> GET http://CLOUD-PAYMENT-SERVICE/payment/get/1 HTTP/1.1
2022-01-22 21:46:25.535 DEBUG 13100 --- [p-nio-80-exec-2] c.s.s.service.PaymentFeignService : [PaymentFeignService#getPaymentById] ---> END HTTP (0-byte body)
2022-01-22 21:46:28.542 DEBUG 13100 --- [p-nio-80-exec-2] c.s.s.service.PaymentFeignService : [PaymentFeignService#getPaymentById] <--- HTTP/1.1 200 (3006ms)
2022-01-22 21:46:28.542 DEBUG 13100 --- [p-nio-80-exec-2] c.s.s.service.PaymentFeignService : [PaymentFeignService#getPaymentById] connection: keep-alive
2022-01-22 21:46:28.542 DEBUG 13100 --- [p-nio-80-exec-2] c.s.s.service.PaymentFeignService : [PaymentFeignService#getPaymentById] content-type: application/json
2022-01-22 21:46:28.544 DEBUG 13100 --- [p-nio-80-exec-2] c.s.s.service.PaymentFeignService : [PaymentFeignService#getPaymentById] date: Sat, 22 Jan 2022 13:46:28 GMT
2022-01-22 21:46:28.544 DEBUG 13100 --- [p-nio-80-exec-2] c.s.s.service.PaymentFeignService : [PaymentFeignService#getPaymentById] keep-alive: timeout=60
2022-01-22 21:46:28.544 DEBUG 13100 --- [p-nio-80-exec-2] c.s.s.service.PaymentFeignService : [PaymentFeignService#getPaymentById] transfer-encoding: chunked
2022-01-22 21:46:28.544 DEBUG 13100 --- [p-nio-80-exec-2] c.s.s.service.PaymentFeignService : [PaymentFeignService#getPaymentById]
2022-01-22 21:46:28.544 DEBUG 13100 --- [p-nio-80-exec-2] c.s.s.service.PaymentFeignService : [PaymentFeignService#getPaymentById] {"code":200,"message":"查询成功","data":{"id":1,"serial":"test"}}
2022-01-22 21:46:28.544 DEBUG 13100 --- [p-nio-80-exec-2] c.s.s.service.PaymentFeignService : [PaymentFeignService#getPaymentById] <--- END HTTP (69-byte body)
# Hystrix断路器
# 概述
Hystrix是一个用于处理分布式系统的延迟和容错的开源库,在分布式系统里,许多依赖不可避免的会调用失败,比如超时、异常等,Hystrix能够保证在一个依赖出问题的情况下,不会导致整体服务失败,避免级联故障,以提高分布式系统的弹性。
“断路器”本身是一种开关装置,当某个服务单元发生故障之后,通过断路器的故障监控(类似熔断保险丝),向调用方返回一个符合预期的、可处理的备选响应(FallBack),而不是长时间的等待或者抛出调用方无法处理的异常,这样就保证了服务调用方的线程不会被长时间、不必要地占用,从而避免了故障在分布式系统中的蔓延,乃至雪崩。
# 核心功能
服务降级
- 服务器忙,请稍后再试,不让客户端等待并立刻返回一个友好提示,fallback
- 哪些情况会触发降级:
- 程序运行异常
- 超时
- 服务熔断触发服务降级
- 线程池/信号量打满也会导致服务降级
服务熔断
- 类比保险丝达到最大服务访问后,直接拒绝访问,拉闸限电,然后调用服务降级的方法并返回友好提示
服务限流
- 秒杀高并发等操作,严禁一窝蜂的过来拥挤,大家排队,一秒钟N个,有序进行
# 服务提供方
依赖
<!--hystrix-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<!--eureka client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
配置
server:
port: 8001
spring:
application:
name: cloud-provider-hystrix-payment
eureka:
client:
register-with-eureka: true
fetch-registry: true
service-url:
#defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka
defaultZone: http://eureka7001.com:7001/eureka
启动类
@SpringBootApplication
@EnableEurekaClient //本服务启动后会自动注册进eureka服务中
public class PaymentHystrixMain8001
{
public static void main(String[] args)
{
SpringApplication.run(PaymentHystrixMain8001.class,args);
}
}
service
@Service
public class PaymentService {
/**
* 正常访问,一切OK
*
* @param id
* @return
*/
public String paymentInfo_OK(Integer id) {
return "线程:" + Thread.currentThread().getName() + "paymentInfo_OK,id: " + id + "\t" + "O(∩_∩)O";
}
/**
* 超时访问,演示降级
*
* @param id
* @return
*/
public String paymentInfo_TimeOut(Integer id) {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "线程:" + Thread.currentThread().getName() + "paymentInfo_TimeOut,id: " + id + "\t" + "O(∩_∩)O,耗费3秒";
}
}
controller
@RestController
@Slf4j
public class PaymentController
{
@Autowired
private PaymentService paymentService;
@Value("${server.port}")
private String serverPort;
@GetMapping("/payment/hystrix/ok/{id}")
public String paymentInfo_OK(@PathVariable("id") Integer id)
{
String result = paymentService.paymentInfo_OK(id);
log.info("****result: "+result);
return result;
}
@GetMapping("/payment/hystrix/timeout/{id}")
public String paymentInfo_TimeOut(@PathVariable("id") Integer id) throws InterruptedException
{
String result = paymentService.paymentInfo_TimeOut(id);
log.info("****result: "+result);
return result;
}
}
# 服务消费方
依赖
<!--openfeign-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<!--hystrix-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<!--eureka client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
配置
server:
port: 80
eureka:
client:
register-with-eureka: false
service-url:
defaultZone: http://eureka7001.com:7001/eureka/
ribbon:
ReadTimeout: 5000
ConnectionTimeout: 5000
启动类
@SpringBootApplication
@EnableFeignClients
public class OrderHystrixMain80
{
public static void main(String[] args)
{
SpringApplication.run(OrderHystrixMain80.class,args);
}
}
service
@Component
@FeignClient(value = "CLOUD-PROVIDER-HYSTRIX-PAYMENT")
public interface PaymentHystrixService
{
@GetMapping("/payment/hystrix/ok/{id}")
String paymentInfo_OK(@PathVariable("id") Integer id);
@GetMapping("/payment/hystrix/timeout/{id}")
String paymentInfo_TimeOut(@PathVariable("id") Integer id);
}
controller
@RestController
@Slf4j
public class OrderHystirxController
{
@Resource
private PaymentHystrixService paymentHystrixService;
@GetMapping("/consumer/payment/hystrix/ok/{id}")
public String paymentInfo_OK(@PathVariable("id") Integer id)
{
String result = paymentHystrixService.paymentInfo_OK(id);
return result;
}
@GetMapping("/consumer/payment/hystrix/timeout/{id}")
public String paymentInfo_TimeOut(@PathVariable("id") Integer id)
{
String result = paymentHystrixService.paymentInfo_TimeOut(id);
return result;
}
}
压力测试http://localhost/consumer/payment/hystrix/ok/1
请求卡顿。
- 8001同一层次的其它接口服务被困死,因为tomcat线程池里面的工作线程已经被挤占完毕
- 80此时调用8001,客户端访问响应缓慢,转圈圈
如何解决
- 对方服务(8001)超时了,调用者(80)不能一直卡死等待,必须有服务降级
- 对方服务(8001)宕机了,调用者(80)不能一直卡死等待,必须有服务降级
- 对方服务(8001)OK,调用者(80)自己出故障或有自我要求(自己的等待时间小于服务提供者),自己处理降级
# 服务降级
设置自身调用超时时间的峰值,峰值内可以正常运行,超过了需要有兜底的方法处理,作服务降级fallback
# 服务提供方
修改服务提供方业务类
/**
* 超时访问,演示降级
*
* @param id
* @return
*/
@HystrixCommand(fallbackMethod = "paymentInfo_TimeOutHandler",commandProperties = {
// 超时时间
@HystrixProperty(name="execution.isolation.thread.timeoutInMilliseconds",value="3000")
})
public String paymentInfo_TimeOut(Integer id) {
int second = 5;
try { TimeUnit.SECONDS.sleep(second); } catch (InterruptedException e) { e.printStackTrace(); }
return "线程:"+Thread.currentThread().getName()+"paymentInfo_TimeOut,id: "+id+"\t"+"O(∩_∩)O,耗费秒: "+second;
}
/**
* 兜底方法
*
* @param id
* @return
*/
public String paymentInfo_TimeOutHandler(Integer id){
return "/(ㄒoㄒ)/调用远程接口超时或异常:\t"+ "\t当前线程名字" + Thread.currentThread().getName();
}
一旦调用服务方法失败并抛出了错误信息后,会自动调用@HystrixCommand
标注好的fallbackMethod调用类中的指定方法
启动类添加注解@EnableCircuitBreaker
@SpringBootApplication
@EnableEurekaClient
// 开启断路器
@EnableCircuitBreaker
public class PaymentHystrixMain8001
{
public static void main(String[] args)
{
SpringApplication.run(PaymentHystrixMain8001.class,args);
}
}
访问测试http://localhost:80/consumer/payment/hystrix/timeout/1
/(ㄒoㄒ)/调用远程接口超时或异常: 当前线程名字HystrixTimer-1
线程名字HystrixTimer-1,Hystrix处理是单独的线程池
模拟运行移除int i = 1 / 0;
,也会走到_paymentInfo_TimeOutHandler_方法
# 服务消费方
虽然保证了提供方的服务降级,但是消费方还没有任何措施
添加配置
开启hystrix对feign的支持
feign:
hystrix:
enabled: true
启动类添加注解@EnableHystrix
,EnableHystrix
包含了@EnableCircuitBreaker
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@EnableCircuitBreaker
public @interface EnableHystrix {
}
@SpringBootApplication
@EnableFeignClients
@EnableHystrix
public class OrderHystrixMain80
{
public static void main(String[] args)
{
SpringApplication.run(OrderHystrixMain80.class,args);
}
}
修改业务类
@GetMapping("/consumer/payment/hystrix/timeout/{id}")
@HystrixCommand(fallbackMethod = "paymentTimeOutFallbackMethod", commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "1500")
})
public String paymentInfo_TimeOut(@PathVariable("id") Integer id) {
return paymentHystrixService.paymentInfo_TimeOut(id);
}
public String paymentTimeOutFallbackMethod(@PathVariable("id") Integer id) {
return "我是消费者80,对方支付系统繁忙,请稍后再试或者自己运行出错请检查自己,o(╥﹏╥)o";
}
虽然有兜底方法,但是每个方法都上都要添加冗余的注解,而且兜底方法和业务方法在同一个类,代码膨胀且混乱
# 配置默认fallback方法
业务类上加注解@DefaultProperties
,需要降级的方法添加注解@HystrixCommand
默认使用@DefaultProperties
配置的降级方法
@RestController
@Slf4j
@DefaultProperties(defaultFallback = "payment_Global_FallbackMethod", commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "5000")
})
public class OrderHystirxController {
@GetMapping("/consumer/payment/hystrix/timeout/{id}")
// @HystrixCommand(fallbackMethod = "paymentTimeOutFallbackMethod", commandProperties = {
// @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "1500")
// })
@HystrixCommand
public String paymentInfo_TimeOut(@PathVariable("id") Integer id) {
return paymentHystrixService.paymentInfo_TimeOut(id);
}
public String paymentTimeOutFallbackMethod(@PathVariable("id") Integer id) {
return "我是消费者80,对方支付系统繁忙,请稍后再试或者自己运行出错请检查自己,o(╥﹏╥)o";
}
public String payment_Global_FallbackMethod() {
return "Global 系统繁忙,请稍后再试";
}
}
上述解决了代码冗余的问题,还剩代码混乱。远程调用失败,进行处理
# 降级实现类
新建业务降级类实现远程调用的接口
@Component
public class PaymentFallbackService implements PaymentHystrixService{
@Override
public String paymentInfo_OK(Integer id) {
return "faild paymentInfo_OK";
}
@Override
public String paymentInfo_TimeOut(Integer id) {
return "faild paymentInfo_TimeOut";
}
}
远程调用类指定降级类fallback
@Component
@FeignClient(value = "CLOUD-PROVIDER-HYSTRIX-PAYMENT", fallback = PaymentFallbackService.class)
public interface PaymentHystrixService {}
# 服务熔断
# 概述
熔断机制概述 熔断机制是应对雪崩效应的一种微服务链路保护机制。当扇出链路的某个微服务出错不可用或者响应时间太长时, 会进行服务的降级,进而熔断该节点微服务的调用,快速返回错误的响应信息。当检测到该节点微服务调用响应正常后,恢复调用链路。
在Spring Cloud框架里,熔断机制通过Hystrix实现。Hystrix会监控微服务间调用的状况,当失败的调用到一定阈值,缺省是5秒内20次调用失败,就会启动熔断机制。熔断机制的注解是@HystrixCommand
。
思想https://martinfowler.com/bliki/CircuitBreaker.html
原理https://github.com/Netflix/Hystrix/wiki/How-it-Works
# 使用
service添加测试方法
@HystrixCommand(fallbackMethod = "paymentCircuitBreaker_fallback", commandProperties = {
// 是否启动服务熔断
@HystrixProperty(name = "circuitBreaker.enabled", value = "true"),
// 请求次数
@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "10"),
// 时间窗口
@HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds", value = "10000"),
// 失败率
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "60"),
})
public String paymentCircuitBreaker(@PathVariable("id") Integer id) {
if (id < 0) {
throw new RuntimeException("******id 不能负数");
}
String serialNumber = IdUtil.simpleUUID();
return Thread.currentThread().getName() + "\t" + "调用成功,流水号: " + serialNumber;
}
public String paymentCircuitBreaker_fallback(@PathVariable("id") Integer id) {
return "id 不能负数,请稍后再试,/(ㄒoㄒ)/~~ id: " + id;
}
controller提供请求接口
@GetMapping("/payment/circuit/{id}")
public String paymentCircuitBreaker(@PathVariable("id") Integer id)
{
String result = paymentService.paymentCircuitBreaker(id);
log.info("****result: "+result);
return result;
}
涉及到断路器的三个重要参数:快照时间窗、请求总数阀值、错误百分比阀值。
- 快照时间窗:断路器确定是否打开需要统计一些请求和错误数据,而统计的时间范围就是快照时间窗,默认为最近的10秒。
- 请求总数阀值:在快照时间窗内,必须满足请求总数阀值才有资格熔断。默认为20,意味着在10秒内,如果该hystrix命令的调用次数不足20次,即使所有的请求都超时或其他原因失败,断路器都不会打开。
- 错误百分比阀值:当请求总数在快照时间窗内超过了阀值,比如发生了30次调用,如果在这30次调用中,有15次发生了超时异常,也就是超过50%的错误百分比,在默认设定50%阀值情况下,这时候就会将断路器打开。
HystrixCommand
详细配置
com/netflix/hystrix/HystrixCommandProperties.java
# 流程
断路器开闭发生的具体方式如下:
- 假设整个服务的音请求满足某个阈值 (
HystrixCommandProperties.circuitBreakerRequestVolumeThreshold()
)... - 并假设错误百分比超过阈值错误百分比(
HystrixCommandProperties.circuitBreakerErrorThresholdPercentage()
)...... - 然后断路器从 转变
CLOSED
为OPEN
。 - 当它打开时,它会将针对该断路器的所有请求短路。
- 一段时间后(
HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds()
),下一个请求被允许通过(这是HALF-OPEN
状态)。如果请求失败,断路器将OPEN
在睡眠窗口期间返回状态。如果请求成功,断路器转换到1.CLOSED
中的逻辑再次接管。
具体配置
//========================All
@HystrixCommand(fallbackMethod = "str_fallbackMethod",
groupKey = "strGroupCommand",
commandKey = "strCommand",
threadPoolKey = "strThreadPool",
commandProperties = {
// 设置隔离策略,THREAD 表示线程池 SEMAPHORE:信号池隔离
@HystrixProperty(name = "execution.isolation.strategy", value = "THREAD"),
// 当隔离策略选择信号池隔离的时候,用来设置信号池的大小(最大并发数)
@HystrixProperty(name = "execution.isolation.semaphore.maxConcurrentRequests", value = "10"),
// 配置命令执行的超时时间
@HystrixProperty(name = "execution.isolation.thread.timeoutinMilliseconds", value = "10"),
// 是否启用超时时间
@HystrixProperty(name = "execution.timeout.enabled", value = "true"),
// 执行超时的时候是否中断
@HystrixProperty(name = "execution.isolation.thread.interruptOnTimeout", value = "true"),
// 执行被取消的时候是否中断
@HystrixProperty(name = "execution.isolation.thread.interruptOnCancel", value = "true"),
// 允许回调方法执行的最大并发数
@HystrixProperty(name = "fallback.isolation.semaphore.maxConcurrentRequests", value = "10"),
// 服务降级是否启用,是否执行回调函数
@HystrixProperty(name = "fallback.enabled", value = "true"),
// 是否启用断路器
@HystrixProperty(name = "circuitBreaker.enabled", value = "true"),
// 该属性用来设置在滚动时间窗中,断路器熔断的最小请求数。例如,默认该值为 20 的时候,
// 如果滚动时间窗(默认10秒)内仅收到了19个请求, 即使这19个请求都失败了,断路器也不会打开。
@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "20"),
// 该属性用来设置在滚动时间窗中,表示在滚动时间窗中,在请求数量超过
// circuitBreaker.requestVolumeThreshold 的情况下,如果错误请求数的百分比超过50,
// 就把断路器设置为 "打开" 状态,否则就设置为 "关闭" 状态。
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "50"),
// 该属性用来设置当断路器打开之后的休眠时间窗。 休眠时间窗结束之后,
// 会将断路器置为 "半开" 状态,尝试熔断的请求命令,如果依然失败就将断路器继续设置为 "打开" 状态,
// 如果成功就设置为 "关闭" 状态。
@HystrixProperty(name = "circuitBreaker.sleepWindowinMilliseconds", value = "5000"),
// 断路器强制打开
@HystrixProperty(name = "circuitBreaker.forceOpen", value = "false"),
// 断路器强制关闭
@HystrixProperty(name = "circuitBreaker.forceClosed", value = "false"),
// 滚动时间窗设置,该时间用于断路器判断健康度时需要收集信息的持续时间
@HystrixProperty(name = "metrics.rollingStats.timeinMilliseconds", value = "10000"),
// 该属性用来设置滚动时间窗统计指标信息时划分"桶"的数量,断路器在收集指标信息的时候会根据
// 设置的时间窗长度拆分成多个 "桶" 来累计各度量值,每个"桶"记录了一段时间内的采集指标。
// 比如 10 秒内拆分成 10 个"桶"收集这样,所以 timeinMilliseconds 必须能被 numBuckets 整除。否则会抛异常
@HystrixProperty(name = "metrics.rollingStats.numBuckets", value = "10"),
// 该属性用来设置对命令执行的延迟是否使用百分位数来跟踪和计算。如果设置为 false, 那么所有的概要统计都将返回 -1。
@HystrixProperty(name = "metrics.rollingPercentile.enabled", value = "false"),
// 该属性用来设置百分位统计的滚动窗口的持续时间,单位为毫秒。
@HystrixProperty(name = "metrics.rollingPercentile.timeInMilliseconds", value = "60000"),
// 该属性用来设置百分位统计滚动窗口中使用 “ 桶 ”的数量。
@HystrixProperty(name = "metrics.rollingPercentile.numBuckets", value = "60000"),
// 该属性用来设置在执行过程中每个 “桶” 中保留的最大执行次数。如果在滚动时间窗内发生超过该设定值的执行次数,
// 就从最初的位置开始重写。例如,将该值设置为100, 滚动窗口为10秒,若在10秒内一个 “桶 ”中发生了500次执行,
// 那么该 “桶” 中只保留 最后的100次执行的统计。另外,增加该值的大小将会增加内存量的消耗,并增加排序百分位数所需的计算时间。
@HystrixProperty(name = "metrics.rollingPercentile.bucketSize", value = "100"),
// 该属性用来设置采集影响断路器状态的健康快照(请求的成功、 错误百分比)的间隔等待时间。
@HystrixProperty(name = "metrics.healthSnapshot.intervalinMilliseconds", value = "500"),
// 是否开启请求缓存
@HystrixProperty(name = "requestCache.enabled", value = "true"),
// HystrixCommand的执行和事件是否打印日志到 HystrixRequestLog 中
@HystrixProperty(name = "requestLog.enabled", value = "true"),
},
threadPoolProperties = {
// 该参数用来设置执行命令线程池的核心线程数,该值也就是命令执行的最大并发量
@HystrixProperty(name = "coreSize", value = "10"),
// 该参数用来设置线程池的最大队列大小。当设置为 -1 时,线程池将使用 SynchronousQueue 实现的队列,
// 否则将使用 LinkedBlockingQueue 实现的队列。
@HystrixProperty(name = "maxQueueSize", value = "-1"),
// 该参数用来为队列设置拒绝阈值。 通过该参数, 即使队列没有达到最大值也能拒绝请求。
// 该参数主要是对 LinkedBlockingQueue 队列的补充,因为 LinkedBlockingQueue
// 队列不能动态修改它的对象大小,而通过该属性就可以调整拒绝请求的队列大小了。
@HystrixProperty(name = "queueSizeRejectionThreshold", value = "5"),
}
)
public String strConsumer() {
return "hello hystrix";
}
public String str_fallbackMethod()
{
return "*****fall back str_fallbackMethod";
}
# 服务监控
除了隔离依赖服务的调用以外,Hystrix还提供了准实时的调用监控(Hystrix Dashboard),Hystrix会持续地记录所有通过Hystrix发起的请求的执行信息,并以统计报表和图形的形式展示给用户,包括每秒执行多少请求多少成功,多少失败等。Netflix通过hystrix-metrics-event-stream项目实现了对以上指标的监控。Spring Cloud也提供了Hystrix Dashboard的整合,对监控内容转化成可视化界面。
依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId>
</dependency>
<!-- 被监控的服务需要导入此依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
配置
server:
port: 9001
启动类添加注解@EnableHystrixDashboard
@SpringBootApplication
@EnableHystrixDashboard
public class HystrixDashboardMain9001
{
public static void main(String[] args)
{
SpringApplication.run(HystrixDashboardMain9001.class,args);
}
}
注意:新版本Hystrix需要在主启动类MainAppHystrix8001(被监控服务)中指定监控路径
/**
*此配置是为了服务监控而配置,与服务容错本身无关,springcloud升级后的坑
*ServletRegistrationBean因为springboot的默认路径不是"/hystrix.stream",
*只要在自己的项目里配置上下面的servlet就可以了
*/
@Bean
public ServletRegistrationBean getServlet() {
HystrixMetricsStreamServlet streamServlet = new HystrixMetricsStreamServlet();
ServletRegistrationBean registrationBean = new ServletRegistrationBean(streamServlet);
registrationBean.setLoadOnStartup(1);
registrationBean.addUrlMappings("/hystrix.stream");
registrationBean.setName("HystrixMetricsStreamServlet");
return registrationBean;
}
浏览器测试http://localhost:9001/hystrix,添加要监控的服务http://localhost:8001/hystrix.stream
# Gateway路由网关
Gateway是在Spring生态系统之上构建的API网关服务,基于Spring 5,Spring Boot 2和 Project Reactor等技术。 Gateway旨在提供一种简单而有效的方式来对API进行路由,以及提供一些强大的过滤器功能, 例如:熔断、限流、重试等
SpringCloud Gateway 使用的Webflux中的reactor-netty响应式编程组件,底层使用了Netty通讯框架。
# 核心概念
- Route:网关的基本构建块。它由 ID、目标 URI、断言集合和过滤器集合定义。如果聚合断言为真,则匹配路由。
- Predicate:这是一个Java 8 函数断言 (opens new window)。输入类型是Spring Framework (opens new window)
[ServerWebExchange](https://docs.spring.io/spring/docs/5.0.x/javadoc-api/org/springframework/web/server/ServerWebExchange.html)
。这使您可以匹配来自 HTTP 请求的任何内容,例如标头或参数。 - Filter :这些是使用特定工厂构建的Spring Framework (opens new window)
[GatewayFilter](https://docs.spring.io/spring/docs/5.0.x/javadoc-api/org/springframework/web/server/GatewayFilter.html)
实例。在这里,您可以在发送下游请求之前或之后修改请求和响应。
客户端向 Spring Cloud Gateway 发出请求。如果网关处理程序映射确定请求与路由匹配,则将其发送到网关 Web 处理程序。此处理程序通过特定于请求的过滤器链运行请求。过滤器用虚线划分的原因是过滤器可以在发送代理请求之前和之后运行逻辑。执行所有“预”过滤器逻辑。然后发出代理请求。发出代理请求后,将运行“发布”过滤器逻辑。
# 使用
依赖
<!--gateway-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!--eureka-client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
配置
server:
port: 9527
spring:
application:
name: cloud-gateway
eureka:
instance:
hostname: cloud-gateway-service
client: #服务提供者provider注册进eureka服务列表内
service-url:
register-with-eureka: true
fetch-registry: true
defaultZone: http://eureka7001.com:7001/eureka
启动类
@SpringBootApplication
@EnableEurekaClient
public class GateWayMain9527
{
public static void main(String[] args)
{
SpringApplication.run(GateWayMain9527.class,args);
}
}
# 路由
# 配置文件
spring:
cloud:
gateway:
enabled: true
routes:
# 路由id,随便写,要求唯一,建议配合服务名
- id: payment_routh
# 匹配成功后提供的微服务地址
uri: http://localhost:8001
# 匹配规则
predicates:
- Path=/payment/get/**
- id: payment_routh2
uri: http://localhost:8001
predicates:
- Path=/payment/lb/**
之前http://localhost:8001/payment/get/1
现在http://localhost:9527/payment/get/1
# 配置类
通过本地网关实现访问网易新闻
@Configuration
public class GatewayConfig {
@Bean
public RouteLocator routeLocator(RouteLocatorBuilder builder) {
return builder.routes()
.route("id_news163_domestic", predicateSpec -> predicateSpec.path("/domestic").uri("https://news.163.com"))
.route("id_news163_world", predicateSpec -> predicateSpec.path("/world").uri("https://news.163.com"))
.build();
}
}
# 动态路由
上面的路由地址都是固定的,我们需要动态路由到不同的机器上。
启动2台相同服务名的不同实例。修改配置文件
spring:
application:
name: cloud-gateway
cloud:
gateway:
discovery:
locator:
# 开启从服务注册中心动态创建路由的功能,使用微服务名称进行路由
enabled: true
enabled: true
routes:
# 路由id,随便写,要求唯一,建议配合服务名
- id: payment_routh
# 匹配成功后提供的微服务地址
# uri: http://localhost:8001
# 不再是http协议,采用lb负载均衡
uri: lb://cloud-payment-service
# 匹配规则
predicates:
- Path=/payment/get/**
- id: payment_routh2
# uri: http://localhost:8001
uri: lb://cloud-payment-service
predicates:
- Path=/payment/lb/**
需要注意的是uri的协议为lb,表示启用Gateway的负载均衡功能。
lb://serviceName
是spring cloud gateway在微服务中自动为我们创建的负载均衡uri
测试http://localhost:9527/payment/lb
# 断言
2022-01-24 17:24:49.268 INFO 50136 --- [ restartedMain] o.s.c.g.r.RouteDefinitionRouteLocator : Loaded RoutePredicateFactory [After]
2022-01-24 17:24:49.268 INFO 50136 --- [ restartedMain] o.s.c.g.r.RouteDefinitionRouteLocator : Loaded RoutePredicateFactory [Before]
2022-01-24 17:24:49.268 INFO 50136 --- [ restartedMain] o.s.c.g.r.RouteDefinitionRouteLocator : Loaded RoutePredicateFactory [Between]
2022-01-24 17:24:49.268 INFO 50136 --- [ restartedMain] o.s.c.g.r.RouteDefinitionRouteLocator : Loaded RoutePredicateFactory [Cookie]
2022-01-24 17:24:49.268 INFO 50136 --- [ restartedMain] o.s.c.g.r.RouteDefinitionRouteLocator : Loaded RoutePredicateFactory [Header]
2022-01-24 17:24:49.268 INFO 50136 --- [ restartedMain] o.s.c.g.r.RouteDefinitionRouteLocator : Loaded RoutePredicateFactory [Host]
2022-01-24 17:24:49.268 INFO 50136 --- [ restartedMain] o.s.c.g.r.RouteDefinitionRouteLocator : Loaded RoutePredicateFactory [Method]
2022-01-24 17:24:49.268 INFO 50136 --- [ restartedMain] o.s.c.g.r.RouteDefinitionRouteLocator : Loaded RoutePredicateFactory [Path]
2022-01-24 17:24:49.268 INFO 50136 --- [ restartedMain] o.s.c.g.r.RouteDefinitionRouteLocator : Loaded RoutePredicateFactory [Query]
2022-01-24 17:24:49.268 INFO 50136 --- [ restartedMain] o.s.c.g.r.RouteDefinitionRouteLocator : Loaded RoutePredicateFactory [ReadBodyPredicateFactory]
2022-01-24 17:24:49.268 INFO 50136 --- [ restartedMain] o.s.c.g.r.RouteDefinitionRouteLocator : Loaded RoutePredicateFactory [RemoteAddr]
2022-01-24 17:24:49.268 INFO 50136 --- [ restartedMain] o.s.c.g.r.RouteDefinitionRouteLocator : Loaded RoutePredicateFactory [Weight]
2022-01-24 17:24:49.268 INFO 50136 --- [ restartedMain] o.s.c.g.r.RouteDefinitionRouteLocator : Loaded RoutePredicateFactory [CloudFoundryRouteService]
- After Route Predicates日期时间路由,请求发起时间需在配置时间之后
spring:
cloud:
gateway:
routes:
- id: after_route
uri: https://example.org
predicates:
- After=2020-02-05T15:10:03.685+08:00[Asia/Shanghai]
- Before Route Predicate日期时间路由,请求发起时间需在配置时间之前
spring:
cloud:
gateway:
routes:
- id: after_route
uri: https://example.org
predicates:
- After=2022-02-05T15:10:03.685+08:00[Asia/Shanghai]
- Between Route Predicate日期时间路由,请求发起时间需在配置时间之间
spring:
cloud:
gateway:
routes:
- id: between_route
uri: https://example.org
predicates:
- Between=2020-02-05T15:10:03.685+08:00[Asia/Shanghai], After=2022-02-05T15:10:03.685+08:00[Asia/Shanghai]
- Cookie Route Predicate通过cookie进行路由,cookie名称和匹配的正则表达式
spring:
cloud:
gateway:
routes:
- id: cookie_route
uri: https://example.org
predicates:
- Cookie=username, starry
- Header Route Predicate请求头路由,header名称和正则表达式进行匹配
spring:
cloud:
gateway:
routes:
- id: header_route
uri: https://example.org
predicates:
- Header=X-Request-Id, \d+
- Host Route Predicate主机路由
spring:
cloud:
gateway:
routes:
- id: host_route
uri: https://example.org
predicates:
- Host=**.somehost.org,**.anotherhost.org
- Method Route Predicate请求方法路由
spring:
cloud:
gateway:
routes:
- id: method_route
uri: https://example.org
predicates:
- Method=GET,POST
- Path Route Predicate请求路径路由
spring:
cloud:
gateway:
routes:
- id: host_route
uri: https://example.org
predicates:
- Path=/red/**,/blue/**
- Query Route Predicate查询参数路由,支持正则表达式
spring:
cloud:
gateway:
routes:
- id: query_route
uri: https://example.org
predicates:
- Query=username
- RemoteAddr Route Predicate远程地址路由
spring:
cloud:
gateway:
routes:
- id: remoteaddr_route
uri: https://example.org
predicates:
- RemoteAddr=192.168.1.1/24
- Weight Route Predicate权重路由
spring:
cloud:
gateway:
routes:
# 这条路线将80%的流量转发到 weighthigh.org,20%的流量转发到 weighlow.org
- id: weight_high
uri: https://weighthigh.org
predicates:
- Weight=group1, 8
- id: weight_low
uri: https://weightlow.org
predicates:
- Weight=group1, 2
# 过滤器
filters
routes:
# 路由id,随便写,要求唯一,建议配合服务名
- id: payment_routh
# 匹配成功后提供的微服务地址
# uri: http://localhost:8001
# 不再是http协议,采用lb负载均衡
uri: lb://cloud-payment-service
# 匹配规则
predicates:
- Path=/payment/get/**
- Method=GET,POST
filters:
# 在请求头上添加参数
- AddRequestHeader=X-Request-Id,1000
自定义过滤器
当一个请求匹配一个路由时,过滤网处理程序会将 GlobalFilter
的所有实例和 GatewayFilter 的所有特定于路由的实例添加到一个过滤器链。这个组合过滤器链按照 org.springframework.core 排序。Ordered
接口,可以通过实现 ==getOrder ()==方法设置该接口。
由于 Spring Cloud Gateway 区分了过滤器逻辑执行的“前”和“后”阶段,具有最高优先级的过滤器是“前”阶段中的第一个,而“后”阶段中的最后一个。
能干吗?全局日志记录,统计网关鉴权...
编码
@Component
@Slf4j
public class MyGlobalFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("{} | {}",new Date(),"执行了自定义全局过滤MyGlobalFilter");
// 模拟获取token
String token = exchange.getRequest().getHeaders().getFirst("token");
if (StrUtil.isBlank(token)) {
// token为空,返回失败信息
log.error("token为空,认证失败");
exchange.getResponse().setStatusCode(HttpStatus.NOT_ACCEPTABLE);
return exchange.getResponse().setComplete();
}
// token有值,此过滤器通过,委托给下一个filter
return chain.filter(exchange);
}
/**
* 数值越低越先执行
* 数值越高越后执行
*
* @return
*/
@Override
public int getOrder() {
return -1;
}
}
2022-01-25 11:01:42.441 INFO 42016 --- [ctor-http-nio-4] c.s.springcloud.filter.MyGlobalFilter : Tue Jan 25 11:01:42 CST 2022 | 执行了自定义全局过滤MyGlobalFilter
2022-01-25 11:01:42.441 ERROR 42016 --- [ctor-http-nio-4] c.s.springcloud.filter.MyGlobalFilter : token为空,认证失败
# 跨域
CORS Configuration
spring:
cloud:
gateway:
globalcors:
corsConfigurations:
# 对于所有 GET 请求的路径,允许来自 docs.spring.io 的请求发出 CORS 请求。
'[/**]':
allowedOrigins: "https://docs.spring.io"
allowedMethods:
- GET
若要为某些网关路由断言不能处理的请求提供相同的 CORS 配置,请将 spring.cloud.gateway.globalcors.add-To-simple-url-handler-mapping
属性设置为 true
。当您尝试支持 CORS 预检请求并且您的路由断言不能计算为 true 时,这是非常有用的,因为 HTTP 方法是选项。
# 常见配置
可以在application.properties
文件内、文件内application.yml
或作为命令行开关指定各种属性。本附录提供了常见 Spring Cloud Gateway 属性的列表以及对使用它们的底层类的引用。
# SpringCloud Config配置中心
# 简介
Spring Cloud Config 为分布式系统中的外部化配置提供服务器端和客户端支持。使用 Config Server,您可以集中管理所有环境中应用程序的外部属性。客户端和服务器上的概念与 SpringEnvironment
和PropertySource
抽象,因此它们非常适合 Spring 应用程序,但可以与以任何语言运行的任何应用程序一起使用。当应用程序通过部署管道从开发到测试再进入生产时,您可以管理这些环境之间的配置,并确保应用程序在迁移时具备运行所需的一切。服务器存储后端的默认实现使用 git,因此它可以轻松支持配置环境的标记版本,并且可以访问用于管理内容的各种工具。很容易添加替代实现并将它们插入到 Spring 配置中。
# 作用
- 集中管理配置文件
- 不同环境不同配置,动态化的配置更新,分环境部署比如dev/test/prod/beta/release
- 运行期间动态调整配置,不再需要在每个服务部署的机器上编写配置文件,服务会向配置中心统一拉取配置自己的信息
- 当配置发生变动时,服务不需要重启即可感知到配置的变化并应用新的配置
- 将配置信息以REST接口的形式暴露(post、curl访问刷新均可......)
# config server
依赖
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
git 目录
配置文件
server:
port: 3344
spring:
application:
name: cloud-config-center
cloud:
config:
server:
git:
uri: https://gitee.com/fanxingweb/spring-cloud-config.git
# 默认分支
default-label: master
# 搜索目录
search-paths:
- spring-cloud-config
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka
启动类添加注解 @EnableConfigServer
@SpringBootApplication
@EnableConfigServer
public class ConfigCenterMain3344 {
public static void main(String[] args) {
SpringApplication.run(ConfigCenterMain3344.class, args);
}
}
修改 hosts 文件
127.0.0.1 config-3344.com
测试 http://config-3344.com:3344/master/config-test.yml (opens new window)
config:
info: master branch,spring-cloud-config/config-test.yml version=1
访问方式
/{application}/{profile}[/{label}]
/{application}-{profile}.yml
/{label}/{application}-{profile}.yml
/{application}-{profile}.properties
/{label}/{application}-{profile}.properties
- http://config-3344.com:3344/config/dev.yml【返回 Json】
- http://config-3344.com:3344/config-dev.yml【原样返回】
- http://config-3344.com:3344/dev/config-dev.yml【原样返回】
application:文件名
profile:环境(dev/test/prod)
label:分支
# config client
依赖
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
配置文件 bootstrap.yml
server:
port: 3355
spring:
application:
name: config-client
cloud:
config:
# 分支
label: master
# 文件名
name: config
# 环境
profile: dev
# 配置服务器端地址
uri: http://config-3344.com:3344
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
启动类
@EnableDiscoveryClient
@SpringBootApplication
public class ConfigClientMain3355 {
public static void main(String[] args) {
SpringApplication.run(ConfigClientMain3355.class, args);
}
}
测试类
@RestController
public class ConfigClientController {
@Value("${config.info}")
private String configInfo;
@GetMapping("/configInfo")
public String getConfigInfo() {
return configInfo;
}
}
访问测试 http://localhost:3355/configInfo (opens new window)
master branch,spring-cloud-config/config-dev.yml version=1
成功读到到配置。修改 git 上的配置,server 读取的最新数据,client 读取的还是旧配置。
# 动态刷新
需要导入监控依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
修改配置文件
# 暴露监控端点
management:
endpoints:
web:
exposure:
include: "*"
在需要刷新的 bean 的类上添加 @RefreshScope
注解。(将@Bean定义放入refresh scope的方便注释。以这种方式注释的 Bean 可以在运行时刷新,任何使用它们的组件都将在下一次方法调用时获得一个新实例,完全初始化并注入所有依赖项。)
@RestController
@RefreshScope
public class ConfigClientController {
@Value("${config.info}")
private String configInfo;
@GetMapping("/configInfo")
public String getConfigInfo() {
return configInfo;
}
}
重启 client,git 上修改配置,向 client 发送一条 post 请求(告诉他需要刷新 bean 了)
curl -X POST "http://localhost:3355/actuator/refresh"
但是这样只能一个一个通知,非常繁琐…(于是来到了SpringCloud Bus)
# SpringCloud Bus消息总线
https://cloud.spring.io/spring-cloud-bus/reference/html/ (opens new window)
# 简介
Spring Cloud Bus 将分布式系统的节点与轻量级消息代理链接起来。然后可以使用此代理来广播状态更改(例如配置更改)或其他管理指令。一个关键的想法是,总线就像一个分布式执行器,用于横向扩展的 Spring Boot 应用程序。但是,它也可以用作应用程序之间的通信渠道。该项目为 AMQP 代理或 Kafka 提供启动器作为传输。
Spring Cloud Bus 配合 Spring Cloud Config 使用可以实现配置的动态刷新。
设计思想
- 利用消息总线触发一个客户端 configClient 的 /bus/refresh,而刷新所有客户端的配置
- 利用消息总线触发一个服务端 ConfigServer 的/ bus/refresh端点,而刷新所有客户端的配置’
第一种不太合适:
- 打破了微服务的职责单一性。微服务本身是业务模块,它不应该承担配置刷新的职责。
- 破坏了微服务各节点的对等性。假设他是集群,一个实例要承担刷新职责,其他实例不用承担刷新职责。
- 有一定的局限性。例如,微服务在迁移时,它的网络地址常常会发生变化,此时如果想要做到自动刷新,那就会增加更多的修改
如果 Spring Cloud Bus 在类路径中检测到自己,则通过添加 Spring Boot 自动配置来工作。要启用总线,请将spring-cloud-starter-bus-amqp
或 添加spring-cloud-starter-bus-kafka
到您的依赖项管理。Spring Cloud 负责其余的工作。确保代理(RabbitMQ 或 Kafka)可用且已配置。在 localhost 上运行时,您无需执行任何操作。如果您远程运行,请使用 Spring Cloud 连接器或 Spring Boot 约定来定义代理凭据,
# 全局广播
复制一个config client,便于模拟广播
# config server
依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
配置文件
server:
port: 3344
spring:
application:
name: cloud-config-center
cloud:
config:
server:
git:
uri: https://gitee.com/fanxingweb/spring-cloud-config.git
default-label: master
search-paths:
- spring-cloud-config
# 添加 mq 的配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka
management:
# 暴露bus刷新配置的端点
endpoints:
web:
exposure:
include: 'bus-refresh'
# config client
依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
配置文件
server:
port: 3355
spring:
application:
name: config-client
cloud:
config:
label: master
name: config
profile: dev
uri: http://config-3344.com:3344
# mq 配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
# 暴露监控端点
management:
endpoints:
web:
exposure:
include: "*"
测试
更新 git 上的配置文件,发送请求
curl -X POST "http://config-3344.com:3344/actuator/bus-refresh"
http://config-3344.com:3344/master/config-dev.yml (opens new window)
http://localhost:3355/configInfo (opens new window)
http://localhost:3366/configInfo (opens new window)
config server 和 config client 都是最新的数据了。一次发送,处处更新。
# 定点通知
应用程序的每个实例都有一个服务 ID,它的值可以设置, spring.cloud.bus.id
并且它的值应该是一个以冒号分隔的标识符列表,从最不具体到最具体。默认值是从环境构造的**spring.application.name**
和 **server.port**
(或**spring.application.index**
,如果设置)的组合。ID 的默认值以 的形式构造app:index:id
,其中:
app
是vcap.application.name
,如果它存在,或者spring.application.name
index
是vcap.application.instance_index
, 如果存在,spring.application.index
,local.server.port
,server.port
, 或0
(按顺序)。id
是vcap.application.instance_id
(如果存在)或随机值。
http://host:port/actuator/bus-refresh/{destination}
HTTP 端点接受“destination”路径参数,例如 /bus-refresh/customers:9000
,其中destination
是服务 ID。如果 ID 由总线上的一个实例拥有,它会处理该消息,而所有其他实例都会忽略它。
比如:只给 client 3355 发消息,不给 client 3366 发消息
destination 可以理解为:配置文件的 spring.application.name:
+ server.port
curl -X POST "http://localhost:3344/actuator/bus-refresh/config-client:3355"
# SpringCloud Stream消息驱动
# 简介
Spring Cloud Stream 是一个框架,用于构建与共享消息系统连接的高度可扩展的事件驱动微服务。
该框架提供了一个灵活的编程模型,该模型基于已经建立和熟悉的 Spring 习惯用法和最佳实践,包括对持久 pub/sub 语义、消费者组和有状态分区的支持。
Spring Cloud Stream 的核心构建块是:
- 目标绑定器:负责提供与外部消息传递系统集成的组件。
- 目的地绑定:外部消息系统和最终用户提供的应用程序代码(生产者/消费者)之间的桥梁。
- 消息:生产者和消费者用来与目标绑定器(以及通过外部消息系统的其他应用程序)进行通信的规范数据结构。
简单来说就是:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型
比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同, 像RabbitMQ有exchange,kafka有Topic和Partitions分区
这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。
通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。
Stream中的消息通信方式遵循了发布-订阅模式,Topic主题进行广播
- 在RabbitMQ就是Exchange
- 在Kakfa中就是Topic
# 常用套路
- Binder:很方便的连接中间件,屏蔽差异
- Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
- Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。
# 常用API
# 生产者
依赖
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-eureka-client</artifactId>
</dependency>
</dependencies>
配置文件(参考https://www.springcloud.cc/spring-cloud-greenwich.html#multiple-systems)
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
# 绑定 提供服务的mq信息(配置)
# org/springframework/cloud/stream/config/BinderProperties.java
binders:
# 自定义mq名称
rabbit1:
# 类型 rabbit kafka
type: rabbit
# 环境配置
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
# 绑定整合 将mq和 输入/输出 进行绑定(整合)
bindings:
# 自定义通道名称
output:
# topic目的地,rabbit中是exchange
destination: myExchange
# 消息类型
content-type: application/json
# 和哪个mq进行绑定
binder: rabbit1
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
instance:
# 心跳间隔时间 默认30s
lease-renewal-interval-in-seconds: 2
# 超时间隔 默认90s
lease-expiration-duration-in-seconds: 5
# 实例id(显示主机名称)
instance-id: send-8801.com
# 访问路径为ip
prefer-ip-address: true
启动类
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class, args);
}
}
发送消息接口
public interface IMessageProvider
{
String send() ;
}
发送消息实现类
// 声明为输出接口,和配置的exchange进行绑定
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider{
/**
* 注入消息发送的channel
*/
@Resource
private MessageChannel output;
@Override
public String send() {
String s = UUID.randomUUID().toString();
Message<String> message = MessageBuilder.withPayload(s).build();
// 发送消息
this.output.send(message);
return s;
}
}
@EnableBinding
注解将一个或多个接口作为参数(在这种情况下,该参数是单个Sink
接口)。接口声明输入和输出通道。Spring Cloud Stream提供了Source
,Sink
和Processor
接口。您也可以定义自己的接口。
如果注入类型是
MessageChannel
接口的话,需要指定Bean的名称,因为会找到3个对应的bean
- 使用
@Resource
、变量名output
- 使用
@Resource(name = "output")
- 使用
@Autowired
+@Qualifier("output")
总之要根据名字指定bean,否则会报错
No qualifying bean of type 'org.springframework.messaging.MessageChannel' available: expected single matching bean but found 3: output,nullChannel,errorChannel
controller
@RestController
public class SendMessageController
{
@Resource
private IMessageProvider messageProvider;
@GetMapping(value = "/sendMessage")
public String sendMessage()
{
return messageProvider.send();
}
}
启动项目,查看mq控制台
多了一个Exchange: myExchange,此时并没有queue绑定
访问 http://localhost:8801/sendMessage,向exchange投递消息(此时并没有queue接收,只是发送到exchange)
# 消费者
依赖
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
配置
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings:
# 自定义通道名称
input:
# 要使用的Exchange
destination: myExchange
content-type: application/json
binder: rabbit1
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2
lease-expiration-duration-in-seconds: 5
instance-id: receive-8802.com
prefer-ip-address: true
启动类
@SpringBootApplication
public class StreamMQMain8802
{
public static void main(String[] args)
{
SpringApplication.run(StreamMQMain8802.class,args);
}
}
消息接收类
// 绑定输入管道
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
@Value("${server.port}")
private String serverPort;
/**
* 监听管道(收到消息后会调用此方法)
*
* Sink.INPUT 就是 ”input“
* 即 配置文件的
* bindings:
* # 自定义通道名称
* input:
* @param message
*/
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
System.out.println("消费者1号,=======》收到消息:" + message.getPayload() + "\t port:" + serverPort);
}
}
作为对Spring Integration支持的补充,Spring Cloud Stream提供了自己的
@StreamListener
注释,其模仿其他Spring消息注释(@MessageMapping
,@JmsListener
,@RabbitListener
等)并提供便利,例如基于内容的路由等。
启动生产者和消费者,可以看到有queue绑定了exchange,Routing key是#
范围http://localhost:8801/sendMessage,查看消费端的控制台成功输出
消费者1号,=======》收到消息:364f5cf0-6f09-49be-be7d-92ea65c859b6 port:8802
# 分组消费
复制一份消费者(除了修改端口,其他代码一样),便于模拟分组
一个生产者,两个消费者。
生产者发送一条消息http://localhost:8801/sendMessage,两个消费者都会收到消息(重复消费)
消费者1号,=======》收到消息:30c1f3df-334a-4302-9559-a435df6e16a7 port:8802
消费者2号,=======》收到消息:30c1f3df-334a-4302-9559-a435df6e16a7 port:8803
我们希望消息只会被一个消费者消费,而不是全部,这里就要使用到group。
Spring Cloud Stream通过消费者群体的概念对这种行为进行建模。(Spring Cloud Stream消费者组类似于Kafka消费者组并受其启发。)每个消费者绑定都可以使用spring.cloud.stream.bindings.<channelName>.group
属性来指定组名。对于下图所示的消费者,此属性将设置为spring.cloud.stream.bindings.<channelName>.group=hdfsWrite
或spring.cloud.stream.bindings.<channelName>.group=average
订阅给定目标的所有组都将收到已发布数据的副本,但是每个组中只有一个成员从该目标接收给定消息。默认情况下,未指定组时,Spring Cloud Stream会将应用程序分配给与所有其他使用者组具有发布-订阅关系的匿名且独立的单成员使用者组。
- 同一个组内只能有一个成员消费消息(竞争关系)
- 不同组可以消费消息
- 默认:每个目标自成一组,即都可以消费消息
为两个消费者配置相同的组group
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings:
# 自定义通道名称
input:
# 要使用的Exchange
destination: myExchange
content-type: application/json
binder: rabbit1
# 配置group
group: sameGroup
重启查看控制台
只有一个queue,两个消费者
持久化
- 绑定器实现确保组订阅是持久的,并且一旦为组创建了至少一个订阅,该组就会接收消息,即使它们是在组中的所有应用程序停止时发送的。
- 匿名订阅本质上是非持久的。对于某些 binder 实现(例如 RabbitMQ),可能会有非持久组订阅。
- 通常,最好在将应用程序绑定到给定目标时始终指定消费者组。当扩展 Spring Cloud Stream 应用程序时,您必须为其每个输入绑定指定一个消费者组。这样做可以防止应用程序的实例 接收重复的消息(除非需要这种行为,这是不寻常的)。
配置group会持久化,不配置(默认)group不会持久化
# SpringCloud Sleuth链路追踪
https://spring.io/projects/spring-cloud-sleuth (opens new window)
Spring Cloud Sleuth 为分布式跟踪提供 Spring Boot 自动配置。
Sleuth 配置您开始所需的一切。这包括跟踪数据(跨度)报告到哪里,要保留多少跟踪(采样),是否发送远程字段(baggage)以及跟踪哪些库。
具体来说,Spring Cloud Sleuth…
- 将跟踪和跨度 ID 添加到 Slf4J MDC,因此您可以从日志聚合器中的给定跟踪或跨度中提取所有日志。
- 检测来自 Spring 应用程序的公共入口和出口点(servlet 过滤器、rest 模板、计划操作、消息通道、feign 客户端)。
- 如果
spring-cloud-sleuth-zipkin
可用,则应用程序将通过 HTTP 生成和报告与Zipkin (opens new window)兼容的跟踪。默认情况下,它将它们发送到 localhost(端口 9411)上的 Zipkin 收集器服务。使用 配置服务的位置spring.zipkin.baseUrl
。
# zipkin
- SpringCloud从F版起已不需要自己构建Zipkin Server了,只需调用jar包即可
- 下载https://zipkin.io/pages/quickstart
- 启动jar包
- java -jar zipkin-server-2.23.16-exec.jar
- 控制台查看
# 服务提供者
之前的cloud-provider-payment8001模块,添加zipkin依赖
<!--包含了sleuth+zipkin-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
添加配置
spring:
zipkin:
base-url: http://localhost:9411
sleuth:
sampler:
# 采样率 1 表示100%
probability: 1
添加controller
@GetMapping("/payment/zipkin")
public String paymentZipkin()
{
return "hi ,i'am paymentzipkin server fall back";
}
# 服务消费者
之前的cloud-consumer-order80模块,添加依赖
<!--包含了sleuth+zipkin-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
添加依赖
spring:
zipkin:
base-url: http://localhost:9411
sleuth:
sampler:
# 采样率 1 表示100%
probability: 1
controller调用服务提供方
@GetMapping("/consumer/payment/zipkin")
public String paymentZipkin() {
// String result = restTemplate.getForObject("http://localhost:8001"+"/payment/zipkin/", String.class);
// 使用服务名 远程调用
String result = restTemplate.getForObject(PAYMENT_URL+"/payment/zipkin/", String.class);
return result;
}
访问http://localhost:80/consumer/payment/zipkin,查看zipkin控制台http://127.0.0.1:9411/zipkin/
# SpringCloud Alibaba
https://github.com/alibaba/spring-cloud-alibaba/blob/2.2.x/README-zh.md (opens new window)
https://spring-cloud-alibaba-group.github.io/github-pages/hoxton/zh-cn/index.html (opens new window)
Sentinel (opens new window):把流量作为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。
Nacos (opens new window):一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。
RocketMQ (opens new window):一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。
Dubbo (opens new window):Apache Dubbo™ 是一款高性能 Java RPC 框架。
Seata (opens new window):阿里巴巴开源产品,一个易于使用的高性能微服务分布式事务解决方案。
Alibaba Cloud OSS (opens new window): 阿里云对象存储服务(Object Storage Service,简称 OSS),是阿里云提供的海量、安全、低成本、高可靠的云存储服务。您可以在任何应用、任何时间、任何地点存储和访问任意类型的数据。
Alibaba Cloud SchedulerX (opens new window): 阿里中间件团队开发的一款分布式任务调度产品,提供秒级、精准、高可靠、高可用的定时(基于 Cron 表达式)任务调度服务。
Alibaba Cloud SMS (opens new window): 覆盖全球的短信服务,友好、高效、智能的互联化通讯能力,帮助企业迅速搭建客户触达通道。
更多组件请参考 Roadmap (opens new window)。
# Nacos服务注册与配置中心
https://nacos.io/zh-cn/ (opens new window)
# 简介
一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。
Dynamic Naming and Configuration Service
nacos即Naming、Configuration、Service
Nacos就是注册中心 + 配置中心的组合
Nacos = Eureka+Config +Bus
# 下载
下载地址:https://github.com/alibaba/nacos/releases (opens new window)
解压并启动:bin目录下的startup脚本文件
由于默认是cluster
模式,需要mysql,所以我们启动时配置为单例:startup.cmd -m standalone
set CUSTOM_SEARCH_LOCATIONS=file:%BASE_DIR%/conf/
set MODE="cluster"
set FUNCTION_MODE="all"
set SERVER=nacos-server
set MODE_INDEX=-1
set FUNCTION_MODE_INDEX=-1
set SERVER_INDEX=-1
set EMBEDDED_STORAGE_INDEX=-1
set EMBEDDED_STORAGE=""
浏览器访问http://localhost:8848/nacos/
,账号密码都是_nacos_
# 服务注册中心
Nacos作为服务注册中心演示
Nacos 支持AP和CP模式的切换
何时选择使用何种模式?
- 一般来说,如果不需要存储服务级别的信息且服务实例是通过nacos-client注册,并能够保持心跳上报,那么就可以选择AP模式。当前主流的服务如 Spring cloud 和 Dubbo 服务,都适用于AP模式,AP模式为了服务的可能性而减弱了一致性,因此AP模式下只支持注册临时实例。
- 如果需要在服务级别编辑或者存储配置信息,那么 CP 是必须,K8S服务和DNS服务则适用于CP模式。CP模式下则支持注册持久化实例,此时则是以 Raft 协议为集群运行模式,该模式下注册实例之前必须先注册服务,如果服务不存在,则会返回错误。
# 服务提供者
父依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.starry</groupId>
<artifactId>cloudlearn</artifactId>
<version>1.0-SNAPSHOT</version>
<modules>
<module>cloud-provider-payment8001</module>
<module>cloud-consumer-order80</module>
<module>cloud-api-commons</module>
<module>cloud-eureka-server7001</module>
<module>cloud-eureka-server7002</module>
<module>cloud-provider-payment8002</module>
<module>cloud-provider-payment8004</module>
<module>cloud-consumerzk-order80</module>
<module>cloud-providerconsul-payment8006</module>
<module>cloud-consumerconsul-order80</module>
<module>cloud-consumer-feign-order80</module>
<module>cloud-provider-hystrix-payment8001</module>
<module>cloud-consumer-feign-hystrix-order80</module>
<module>cloud-consumer-hystrix-dashboard9001</module>
<module>cloud-gateway-gateway9527</module>
<module>cloud-config-center-3344</module>
<module>cloud-config-client-3355</module>
<module>cloud-config-client-3366</module>
<module>cloud-stream-rabbitmq-provider8801</module>
<module>cloud-stream-rabbitmq-consumer8802</module>
<module>cloud-stream-rabbitmq-consumer8803</module>
<module>cloudalibaba-provider-payment9001</module>
</modules>
<packaging>pom</packaging>
<!-- 统一管理jar包版本 -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
<log4j.version>1.2.17</log4j.version>
<lombok.version>1.16.18</lombok.version>
<mysql.version>5.1.47</mysql.version>
<druid.version>1.1.17</druid.version>
<mybatis.spring.boot.version>1.3.1</mybatis.spring.boot.version>
</properties>
<!-- 子模块继承之后,提供作用:锁定版本+子modlue不用写groupId和version -->
<dependencyManagement>
<dependencies>
<!--spring boot 2.2.2-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.2.2.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--spring cloud Hoxton.SR1-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.SR1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--spring cloud alibaba 2.1.0.RELEASE-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.1.0.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis.spring.boot.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<optional>true</optional>
</dependency>
</dependencies>
</dependencyManagement>
</project>
项目依赖
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
配置文件
server:
port: 9001
spring:
application:
name: nacos-payment-provider
cloud:
nacos:
discovery:
server-addr: localhost:8848
management:
endpoints:
web:
exposure:
include: '*'
启动类
@EnableDiscoveryClient
@SpringBootApplication
public class PaymentMain9001
{
public static void main(String[] args) {
SpringApplication.run(PaymentMain9001.class, args);
}
}
controller
@RestController
public class PaymentController
{
@Value("${server.port}")
private String serverPort;
@GetMapping(value = "/payment/nacos/{id}")
public String getPayment(@PathVariable("id") Integer id)
{
return "nacos registry, serverPort: "+ serverPort+"\t id"+id;
}
}
nacos控制台查看http://localhost:8848/nacos/,服务已经成功注册到nacos了
为了便于演示负载均衡,再新建一个一样的服务提供者,或者修改配置
# 服务消费者
依赖
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- 自定义api通用包,通用实体类-->
<dependency>
<groupId>com.starry</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
配置文件
server:
port: 83
spring:
application:
name: nacos-order-consumer
cloud:
nacos:
discovery:
server-addr: localhost:8848
# 要访问的服务地址(自定义标签)
service-url:
nacos-user-service: http://nacos-payment-provider
启动类
@EnableDiscoveryClient
@SpringBootApplication
public class OrderNacosMain83
{
@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}
public static void main(String[] args)
{
SpringApplication.run(OrderNacosMain83.class,args);
}
}
controller
@RestController
public class OrderNacosController
{
@Resource
private RestTemplate restTemplate;
@Value("${service-url.nacos-user-service}")
private String serverURL;
@GetMapping("/consumer/payment/nacos/{id}")
public String paymentInfo(@PathVariable("id") Long id)
{
return restTemplate.getForObject(serverURL+"/payment/nacos/"+id,String.class);
}
}
访问消费者的接口http://localhost:83/consumer/payment/nacos/1,实现负载均衡
nacos registry, serverPort: 9002 id1
nacos registry, serverPort: 9001 id1
nacos registry, serverPort: 9002 id1
nacos registry, serverPort: 9001 id1
查看nacos控制台,消费者两个实例
# 服务配置中心
配置文件
bootstrap.yml
server:
port: 3377
spring:
application:
name: nacos-config-client
cloud:
nacos:
discovery:
# nacos 服务注册中心地址
server-addr: localhost:8848
config:
# nacos 配置中心地址
server-addr: localhost:8848
# 文件扩展名
file-extension: yaml
# ${prefix}-${spring.profiles.active}.${file-extension}
# nacos-config-client-dev.yaml
application.yml
spring:
profiles:
active: dev
# 配置说明
说明:之所以需要配置 spring.application.name
,是因为它是构成 Nacos 配置管理 dataId
字段的一部分。
在 Nacos Spring Cloud 中,dataId
的完整格式如下:
${prefix}-${spring.profiles.active}.${file-extension}
prefix
默认为spring.application.name
的值,也可以通过配置项spring.cloud.nacos.config.prefix
来配置。spring.profiles.active
即为当前环境对应的 profile,详情可以参考 Spring Boot文档 (opens new window)。 **注意:当**spring.profiles.active**
为空时,对应的连接符**-**
也将不存在,dataId 的拼接格式变成 ****${prefix}.${file-extension}**
file-exetension
为配置内容的数据格式,可以通过配置项spring.cloud.nacos.config.file-extension
来配置。目前只支持properties
和yaml
类型。
通过 Spring Cloud 原生注解 @RefreshScope
实现配置自动更新
启动类
@EnableDiscoveryClient
@SpringBootApplication
public class NacosConfigClientMain3377
{
public static void main(String[] args) {
SpringApplication.run(NacosConfigClientMain3377.class, args);
}
}
controller
@RestController
@RefreshScope
public class ConfigClientController
{
@Value("${config.info}")
private String configInfo;
@GetMapping("/config/info")
public String getConfigInfo() {
return configInfo;
}
}
控制台添加配置文件
访问测试http://localhost:3377/config/info
from nacos,nacos-config-client-dev.yaml,version=1
修改nacos配置文件,再次访问,自动刷新
from nacos,nacos-config-client-dev.yaml,version=2
# Namespace
https://nacos.io/zh-cn/docs/concepts.html (opens new window)
用于进行租户粒度的配置隔离。不同的命名空间下,可以存在相同的 Group 或 Data ID 的配置。Namespace 的常用场景之一是不同环境的配置的区分隔离,例如开发测试环境和生产环境的资源(如配置、服务)隔离等。
在没有明确指定 ${spring.cloud.nacos.config.namespace}
配置的情况下, 默认使用的是 Nacos 上 Public 这个namespae。如果需要使用自定义的命名空间,可以通过以下配置来实现
spring.cloud.nacos.config.namespace=b3404bc0-d7dc-4855-b519-570ed34b62d7
该配置必须放在 bootstrap.properties 文件中。此外
spring.cloud.nacos.config.namespace
的值是 namespace 对应的 id,id 值可以在 Nacos 的控制台获取。并且在添加配置时注意不要选择其他的 namespae,否则将会导致读取不到正确的配置。
# Group
在没有明确指定 ${spring.cloud.nacos.config.group}
配置的情况下, 默认使用的是 DEFAULT_GROUP 。如果需要自定义自己的 Group,可以通过以下配置来实现:
spring.cloud.nacos.config.group=DEVELOP_GROUP
该配置必须放在 bootstrap.properties 文件中。并且在添加配置时 Group 的值一定要和
spring.cloud.nacos.config.group
的配置值一致
# 自定义 Data Id 配置
https://spring-cloud-alibaba-group.github.io/github-pages/hoxton/zh-cn/index.html# (opens new window)%E6%94%AF%E6%8C%81%E8%87%AA%E5%AE%9A%E4%B9%89%E6%89%A9%E5%B1%95%E7%9A%84_data_id%E9%85%8D%E7%BD%AE
# 集群和持久化
https://nacos.io/zh-cn/docs/cluster-mode-quick-start.html (opens new window)
Nacos默认自带的是嵌入式数据库derby,虽然可以存储数据,但是只能本地读取,不能集群间通信,我们想要集群间访问,就需要一个可以在网络中访问的数据库,可以使用Mysql
nginx + nacos*3 + mysql
# 数据库切换
- 在mysql中执行_nacos/conf/nacos-mysql.sql_文件
- 修改_nacos/conf/application.properties_配置文件
#*************** Config Module Related Configurations ***************#
### If use MySQL as datasource:
spring.datasource.platform=mysql
### Count of DB:
db.num=1
### Connect URL of DB:
db.url.0=jdbc:mysql://127.0.0.1:3306/nacos_config?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
db.user.0=root
db.password.0=root
- 启动nacos
# 单例模式启动,默认集群cluster
startup.cmd -m standalone
- 查看控制台,之前的配置信息都不在了。添加一条配置,查看数据库,也插入一条记录,切换数据库成功。
# 集群配置
- 修改_nacos/conf/cluster.conf_配置文件 请每行配置成ip:port。(请配置3个或3个以上节点)
# ip:port
192.168.0.3:8848
192.168.0.3:8850
192.168.0.3:8851
- 在mysql中执行_nacos/conf/nacos-mysql.sql_文件
- 修改_nacos/conf/application.properties_配置文件
- 启动每个nacos
startup.cmd -m cluster
如果是同一台集群上操作,需要使用不同的端口
nginx添加集群代理,负载均衡
upstream cluster {
server 192.168.0.3:8848;
server 192.168.0.3:8850;
server 192.168.0.3:8851;
}
server {
listen 1111;
server_name localhost;
location / {
proxy_pass http://cluster;
}
}
修改项目配置中心地址
server:
port: 3377
spring:
application:
name: nacos-config-client
cloud:
nacos:
discovery:
# nacos 服务注册中心地址(nginx地址)
server-addr: 192.168.0.3:1111
config:
# nacos 配置中心地址
server-addr: 192.168.0.3:1111
# 文件扩展名
file-extension: yaml
# ${prefix}-${spring.profiles.active}.${file-extension}
# nacos-config-client-dev.yaml
进入控制台http://192.168.0.3:1111/nacos,添加配置。访问controller,成功返回;查看数据库,成功添加。
# Sentinel服务限流与熔断
# 简介
随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。
https://sentinelguard.io/zh-cn/docs/introduction.html (opens new window)
https://github.com/alibaba/Sentinel/wiki/介绍 (opens new window)
# 环境搭建
下载
https://github.com/alibaba/Sentinel/releases (opens new window)
注意:启动 Sentinel 控制台需要 JDK 版本为 1.8 及以上版本。
启动
java -Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard.jar
其中 -Dserver.port=8080
用于指定 Sentinel 控制台端口为 8080
。
从 Sentinel 1.6.0 起,Sentinel 控制台引入基本的登录功能,默认用户名和密码都是 sentinel
。可以参考 鉴权模块文档 (opens new window) 配置用户名和密码。
启动nacos
# 编码
依赖
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- 后续做持久化用到-->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
配置文件
server:
port: 8401
spring:
application:
name: cloudalibaba-sentinel-service
cloud:
nacos:
discovery:
server-addr: localhost:8848
sentinel:
transport:
# Sentinel 仪表板地址
dashboard: localhost:8080
# 应用程序与 Sentinel 仪表板交互的端口。使用此端口的 HTTP 服务器将在应用程序中启动
# api地址 默认8719,占用后+1,直到端口没有被占用
port: 8719
management:
endpoints:
web:
exposure:
include: '*'
指定的端口号spring.cloud.sentinel.transport.port
将在应用程序的相应服务器上启动一个 HTTP Server,该服务器将与 Sentinel 仪表板交互。例如,如果在 Sentinel 仪表板中添加了限速规则,则规则数据将被 HTTP 服务器推送和接收,然后将规则注册到 Sentinel。
启动类
@EnableDiscoveryClient
@SpringBootApplication
public class MainApp8401
{
public static void main(String[] args) {
SpringApplication.run(MainApp8401.class, args);
}
}
controller
@RestController
public class FlowLimitController
{
@GetMapping("/testA")
public String testA()
{
return "------testA";
}
@GetMapping("/testB")
public String testB()
{
return "------testB";
}
}
启动项目测试,http://localhost:8080/#/dashboard/home查看控制台并没有服务
访问接口http://localhost:8401/testB后再次查看控制台,此时才出现服务
说明sentinel采用懒加载
# 流量控制
https://sentinelguard.io/zh-cn/docs/flow-control.html (opens new window)
重要属性:
Field | 说明 | 默认值 |
---|---|---|
resource | 资源名,资源名是限流规则的作用对象 | |
count | 限流阈值 | |
grade | 限流阈值类型,QPS 或线程数模式 | QPS 模式 |
limitApp | 流控针对的调用来源 | default |
,代表不区分调用来源 | ||
strategy | 调用关系限流策略:直接、链路、关联 | 根据资源本身(直接) |
controlBehavior | 流控效果(直接拒绝 / 排队等待 / 慢启动模式),不支持按调用关系限流 | 直接拒绝 |
同一个资源可以同时有多个限流规则。
# 流控模式
# 直接
一秒钟只能访问一次
超出指定的QPS会返回_Blocked by Sentinel (flow limiting)_
# 关联
当关联的资源达到阈值时,就限流自己;当与A关联的资源B达到阀值后,就限流A自己
当B的QPS超过1时,就限流A;B可以正常运行
一直调用B接口
- 期间手动访问B正常访问http://localhost:8401/testB
- 访问A被限流http://localhost:8401/testA Blocked by Sentinel (flow limiting)
# 链路
阈值统计时,只统计从指定资源进入当前资源的请求,是对请求来源的限流
# 流控效果
当 QPS 超过某个阈值的时候,则采取措施进行流量控制。流量控制的效果包括以下几种:直接拒绝、Warm Up、匀速排队。对应 FlowRule
中的 controlBehavior
字段。
注意:若使用除了直接拒绝之外的流量控制效果,则调用关系限流策略(strategy)会被忽略。
# 快速失败
直接拒绝(RuleConstant.CONTROL_BEHAVIOR_DEFAULT
)方式。该方式是默认的流量控制方式,当QPS超过任意规则的阈值后,新的请求就会被立即拒绝,拒绝方式为抛出FlowException
。这种方式适用于对系统处理能力确切已知的情况下,比如通过压测确定了系统的准确水位时。
直接返回_Blocked by Sentinel (flow limiting)_
com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController
# Warm Up
Warm Up(RuleConstant.CONTROL_BEHAVIOR_WARM_UP
)方式,即预热/冷启动方式。当系统长期处于低水位的情况下,当流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过"冷启动",让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,避免冷系统被压垮。
公式:阈值除以coldFactor(默认值为3),经过预热时长后才会达到阈值
com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController
public WarmUpController(double count, int warmUpPeriodInSec) {
construct(count, warmUpPeriodInSec, 3);
}
默认coldFactor为3,即请求 QPS 从 threshold / 3 开始,经预热时长逐渐升至设定的 QPS 阈值
阀值为10、预热时长设置6秒。 系统初始化的阀值为10 / 3 约等于3,即阀值刚开始为3;然后过了6秒后阀值才慢慢升高恢复到10
# 排队等待
匀速排队(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER
)方式会严格控制请求通过的间隔时间,也即是让请求以均匀的速度通过,对应的是漏桶算法
这种方式主要用于处理间隔性突发的流量,例如消息队列。想象一下这样的场景,在某一秒有大量的请求到来,而接下来的几秒则处于空闲状态,我们希望系统能够在接下来的空闲期间逐渐处理这些请求,而不是在第一秒直接拒绝多余的请求。
注意:匀速排队模式暂时不支持 QPS > 1000 的场景。
匀速排队,让请求以均匀的速度通过,阀值类型必须设成QPS,否则无效。 设置含义:/testB每秒1次请求,超过的话就排队等待,等待的超时时间为2000毫秒。
com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController
# 熔断降级
https://sentinelguard.io/zh-cn/docs/circuit-breaking.html (opens new window)
现代微服务架构都是分布式的,由非常多的服务组成。不同服务之间相互调用,组成复杂的调用链路。复杂链路上的某一环不稳定,就可能会层层级联,最终导致整个链路都不可用。因此我们需要对不稳定的弱依赖服务调用进行熔断降级,暂时切断不稳定调用,避免局部不稳定因素导致整体的雪崩。熔断降级作为保护自身的手段,通常在客户端(调用端)进行配置。
熔断降级规则(DegradeRule)包含下面几个重要的属性:
Field | 说明 | 默认值 |
---|---|---|
resource | 资源名,即规则的作用对象 | |
grade | 熔断策略,支持慢调用比例/异常比例/异常数策略 | 慢调用比例 |
count | 慢调用比例模式下为慢调用临界 RT(超出该值计为慢调用);异常比例/异常数模式下为对应的阈值 | |
timeWindow | 熔断时长,单位为 s | |
minRequestAmount | 熔断触发的最小请求数,请求数小于该值时即使异常比率超出阈值也不会熔断(1.7.0 引入) | 5 |
statIntervalMs | 统计时长(单位为 ms),如 60*1000 代表分钟级(1.8.0 引入) | 1000 ms |
slowRatioThreshold | 慢调用比例阈值,仅慢调用比例模式有效(1.8.0 引入) |
# 慢调用比例
- 慢调用比例 (
SLOW_REQUEST_RATIO
):选择以慢调用比例作为阈值,需要设置允许的慢调用 RT(即最大的响应时间),请求的响应时间大于该值则统计为慢调用。 - 当单位统计时长(
statIntervalMs
)内请求数目大于设置的最小请求数目,并且慢调用的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。 - 经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求响应时间小于设置的慢调用 RT 则结束熔断,若大于设置的慢调用 RT 则会再次被熔断。
- 熔断状态
- OPEN:熔断开启,拒绝请求
- HALF_OPEN:探测恢复状态,接下来的一个请求顺利通过就结束熔断,否则继续熔断
- CLOSED:熔断关闭,请求通过
响应时间 > 500ms为慢调用,在1000ms内有5次以上调用,慢调用比率 > 0.5,就会触发熔断,熔断时长为3s
@GetMapping("/testD")
public String testD() throws InterruptedException {
Thread.sleep(2000);
return "------testD";
}
使用jmeter调用testD接口,再手动访问发现服务不可用,服务进入熔断;关闭压测工具,等待几秒钟,再次访问,服务可用,熔断关闭。
Blocked by Sentinel (flow limiting)
# 异常比例
- 异常比例 (
ERROR_RATIO
):当单位统计时长(statIntervalMs
)内请求数目大于设置的最小请求数目,并且异常的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。异常比率的阈值范围是[0.0, 1.0]
,代表 0% - 100%。
1000ms内请求数量 > 5,异常比例 > 0.5,触发熔断,熔断时长为3s
@GetMapping("/testE")
public String testE() {
throw new RuntimeException();
}
使用jmeter调用testE接口,再手动访问发现服务不可用,服务进入熔断;关闭压测工具,等待几秒钟,再次访问,服务可用,熔断关闭。
Blocked by Sentinel (flow limiting)
# 异常数
- 异常数 (
ERROR_COUNT
):当单位统计时长内的异常数目超过阈值之后会自动进行熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。
1000ms 内请求次数 > 5,异常数 > 20,触发熔断,熔断3s
# 热点参数限流
https://github.com/alibaba/Sentinel/wiki/热点参数限流 (opens new window)
何为热点?热点即经常访问的数据。很多时候我们希望统计某个热点数据中访问频次最高的 Top K 数据,并对其访问进行限制。比如:
- 商品 ID 为参数,统计一段时间内最常购买的商品 ID 并进行限制
- 用户 ID 为参数,针对一段时间内频繁访问的用户 ID 进行限制
热点参数限流会统计传入参数中的热点参数,并根据配置的限流阈值与模式,对包含热点参数的资源调用进行限流。热点参数限流可以看做是一种特殊的流量控制,仅对包含热点参数的资源调用生效。
Sentinel 利用 LRU 策略统计最近最常访问的热点参数,结合令牌桶算法来进行参数级别的流控。热点参数限流支持集群模式。
热点参数规则(ParamFlowRule
)类似于流量控制规则(FlowRule
):
属性 | 说明 | 默认值 |
---|---|---|
resource | 资源名,必填 | |
count | 限流阈值,必填 | |
grade | 限流模式 | QPS 模式 |
durationInSec | 统计窗口时间长度(单位为秒),1.6.0 版本开始支持 | 1s |
controlBehavior | 流控效果(支持快速失败和匀速排队模式),1.6.0 版本开始支持 | 快速失败 |
maxQueueingTimeMs | 最大排队等待时长(仅在匀速排队模式生效),1.6.0 版本开始支持 | 0ms |
paramIdx | 热点参数的索引,必填,对应 SphU.entry(xxx, args) | |
中的参数索引位置 | ||
paramFlowItemList | 参数例外项,可以针对指定的参数值单独设置限流阈值,不受前面 count | |
阈值的限制。仅支持基本类型和字符串类型 | ||
clusterMode | 是否是集群参数流控规则 | false |
clusterConfig | 集群流控相关配置 |
注意:若 entry 的时候传入了热点参数,那么 exit 的时候也一定要带上对应的参数(exit(count, args)
),否则可能会有统计错误。正确的示例:
Entry entry = null;
try {
entry = SphU.entry(resourceName, EntryType.IN, 1, paramA, paramB);
// Your logic here.
} catch (BlockException ex) {
// Handle request rejection.
} finally {
if (entry != null) {
entry.exit(1, paramA, paramB);
}
}
其实就是给要执行的方法包了一层,try/catch,先统计流量再执行我们的业务
对于 @SentinelResource
注解方式定义的资源,若注解作用的方法上有参数,Sentinel 会将它们作为参数传入 SphU.entry(res, args)
。比如以下的方法里面 uid
和 type
会分别作为第一个和第二个参数传入 Sentinel API,从而可以用于热点规则判断:
@SentinelResource("myMethod")
public Result doSomething(String uid, int type) {
// some logic here...
}
注意:目前 Sentinel 自带的 adapter 仅 Dubbo 方法埋点带了热点参数,其它适配模块(如 Web)默认不
支持热点规则,可通过自定义埋点方式指定新的资源名并传入希望的参数。注意自定义埋点的资源名不要和适配模块生成的资源名重复,否则会导致重复统计。
com.alibaba.csp.sentinel.slots.block.BlockException
编码
@GetMapping("/testHotKey")
/*
* value 资源名称
* blockHandler 限流处理
*/
@SentinelResource(value = "testHotKey", blockHandler = "dealHandler_testHotKey")
public String testHotKey(@RequestParam(value = "p1", required = false) String p1,
@RequestParam(value = "p2", required = false) String p2) {
return "------testHotKey";
}
public String dealHandler_testHotKey(String p1, String p2, BlockException exception) {
return "-----dealHandler_testHotKey";
}
配置
单台机器,1s的时间窗口内,资源testHotKey的索引为0的参数(就是第一个参数),只能访问一次。
超出阈值,走我们自定义的限流处理;替换之前的默认输出_Blocked by Sentinel (flow limiting)_
-----dealHandler_testHotKey
测试
- http://localhost:8401/testHotKey?p1=abc (opens new window) 限流
- http://localhost:8401/testHotKey?p1=abc&p2=33 (opens new window) 限流
- http://localhost:8401/testHotKey?p2=abc (opens new window) 通过
我们希望特定的值不被限流
当第一个参数的值是_phone_时,限流阈值是30,其他情况下都是1
# 系统自适应保护
系统保护规则是从应用级别的入口流量进行控制,从单台机器的总体 Load、RT、入口 QPS 和线程数四个维度监控应用数据,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。
系统保护规则是应用整体维度的,而不是资源维度的,并且仅对入口流量生效。入口流量指的是进入应用的流量(EntryType.IN
),比如 Web 服务或 Dubbo 服务端接收的请求,都属于入口流量。
系统规则支持以下的阈值类型:
- Load(仅对 Linux/Unix-like 机器生效):当系统 load1 超过阈值,且系统当前的并发线程数超过系统容量时才会触发系统保护。系统容量由系统的
maxQps * minRt
计算得出。设定参考值一般是CPU cores * 2.5
。 - CPU usage(1.5.0+ 版本):当系统 CPU 使用率超过阈值即触发系统保护(取值范围 0.0-1.0)。
- RT:当单台机器上所有入口流量的平均 RT 达到阈值即触发系统保护,单位是毫秒。
- 线程数:当单台机器上所有入口流量的并发线程数达到阈值即触发系统保护。
- 入口 QPS:当单台机器上所有入口流量的 QPS 达到阈值即触发系统保护。
# @SentinelResource
https://github.com/alibaba/Sentinel/wiki/注解支持 (opens new window)
注意:注解方式埋点不支持 private 方法。
@SentinelResource
用于定义资源,并提供可选的异常处理和 fallback 配置项。 @SentinelResource
注解包含以下属性:
value
:资源名称,必需项(不能为空)entryType
:entry 类型,可选项(默认为EntryType.OUT
)blockHandler
/blockHandlerClass
:blockHandler
对应处理BlockException
的函数名称,可选项。blockHandler 函数访问范围需要是public
,返回类型需要与原方法相匹配,参数类型需要和原方法相匹配并且最后加一个额外的参数,类型为BlockException
。blockHandler 函数默认需要和原方法在同一个类中。若希望使用其他类的函数,则可以指定blockHandlerClass
为对应的类的Class
对象,注意对应的函数必需为 static 函数,否则无法解析。fallback
/fallbackClass
:fallback 函数名称,可选项,用于在抛出异常的时候提供 fallback 处理逻辑。fallback 函数可以针对所有类型的异常(除了exceptionsToIgnore
里面排除掉的异常类型)进行处理。fallback 函数签名和位置要求:- 返回值类型必须与原函数返回值类型一致;
- 方法参数列表需要和原函数一致,或者可以额外多一个
Throwable
类型的参数用于接收对应的异常。 - fallback 函数默认需要和原方法在同一个类中。若希望使用其他类的函数,则可以指定
fallbackClass
为对应的类的Class
对象,注意对应的函数必需为 static 函数,否则无法解析。
defaultFallback
(since 1.6.0):默认的 fallback 函数名称,可选项,通常用于通用的 fallback 逻辑(即可以用于很多服务或方法)。默认 fallback 函数可以针对所有类型的异常(除了exceptionsToIgnore
里面排除掉的异常类型)进行处理。若同时配置了 fallback 和 defaultFallback,则只有 fallback 会生效。defaultFallback 函数签名要求:- 返回值类型必须与原函数返回值类型一致;
- 方法参数列表需要为空,或者可以额外多一个
Throwable
类型的参数用于接收对应的异常。
- 方法参数列表需要为空,或者可以额外多一个
- defaultFallback 函数默认需要和原方法在同一个类中。若希望使用其他类的函数,则可以指定
fallbackClass
为对应的类的Class
对象,注意对应的函数必需为 static 函数,否则无法解析。 exceptionsToIgnore
(since 1.6.0):用于指定哪些异常被排除掉,不会计入异常统计中,也不会进入 fallback 逻辑中,而是会原样抛出。
1.8.0 版本开始,defaultFallback
支持在类级别进行配置。
注:1.6.0 之前的版本 fallback 函数只针对降级异常(
DegradeException
)进行处理,不能针对业务异常进行处理。
特别地,若 blockHandler 和 fallback 都进行了配置,则被限流降级而抛出 BlockException
时只会进入 blockHandler
处理逻辑。若未配置 blockHandler
、fallback
和 defaultFallback
,则被限流降级时会将 BlockException
直接抛出(若方法本身未定义 throws BlockException 则会被 JVM 包装一层 UndeclaredThrowableException
)。
# 整合Feign
https://github.com/alibaba/Sentinel/wiki/主流框架的适配 (opens new window)
# 环境搭建
# 服务提供方
服务提供方,注册到nacos(搭建两个一样的,修改端口,以达到负载均衡)
依赖
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>com.starry</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
配置文件
server:
port: 9003
spring:
application:
name: nacos-payment-provider
cloud:
nacos:
discovery:
server-addr: localhost:8848
management:
endpoints:
web:
exposure:
include: '*'
启动类
@SpringBootApplication
@EnableDiscoveryClient
public class PaymentMain9003
{
public static void main(String[] args) {
SpringApplication.run(PaymentMain9003.class, args);
}
}
controller
@RestController
public class PaymentController
{
@Value("${server.port}")
private String serverPort;
public static HashMap<Long, Payment> hashMap = new HashMap<>();
static
{
hashMap.put(1L,new Payment(1L,"28a8c1e3bc2742d8848569891fb42181"));
hashMap.put(2L,new Payment(2L,"bba8c1e3bc2742d8848569891ac32182"));
hashMap.put(3L,new Payment(3L,"6ua8c1e3bc2742d8848569891xt92183"));
}
@GetMapping(value = "/paymentSQL/{id}")
public CommonResult<Payment> paymentSQL(@PathVariable("id") Long id)
{
// 使用map,模拟从数据库查询
Payment payment = hashMap.get(id);
CommonResult<Payment> result = new CommonResult(200,"from mysql,serverPort: "+serverPort,payment);
return result;
}
}
# 服务消费方
依赖
<dependencies>
<dependency>
<groupId>com.starry</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
配置文件
server:
port: 84
spring:
application:
name: nacos-order-consumer
cloud:
nacos:
discovery:
server-addr: localhost:8848
sentinel:
transport:
dashboard: localhost:8080
port: 8719
# 开启Sentinel对Feign的支持
feign:
sentinel:
enabled: true
启动类
@EnableDiscoveryClient
@SpringBootApplication
@EnableFeignClients
public class OrderNacosMain84
{
public static void main(String[] args) {
SpringApplication.run(OrderNacosMain84.class, args);
}
}
远程调用接口
@FeignClient(value = "nacos-payment-provider",fallback = PaymentFallbackService.class)
public interface PaymentService
{
@GetMapping(value = "/paymentSQL/{id}")
public CommonResult<Payment> paymentSQL(@PathVariable("id") Long id);
}
降级/限流方法
@Component
public class PaymentFallbackService implements PaymentService {
@Override
public CommonResult<Payment> paymentSQL(Long id) {
return new CommonResult<>(444,"服务降级返回,没有该流水信息");
}
}
controller
@RestController
public class CircleBreakerController {
@Resource
PaymentService paymentService;
@GetMapping("/consumer/openfeign/{id}")
public CommonResult<Payment> paymentSQL(@PathVariable("id") Long id) {
if (id == 4) {
throw new RuntimeException("没有该id");
}
return paymentService.paymentSQL(id);
}
}
必须开启sentinel对feign的支持feign.sentinel.enabled:=true
,否则@FeignClient配置的fallback属性不起效,如果远程调用超时/失败,都会走fallback方法;如果sentinel对feign的接口进行了配置(限流,熔断…),也会走到fallback方法。
快速访问http://localhost:84/consumer/openfeign/2
# 持久化
开源版本:只能手动把一个一个配置到nacos,并不能实现sentinel控制台修改,同步到nacos
https://github.com/alibaba/Sentinel/issues/2420 (opens new window)
https://github.com/alibaba/Sentinel/wiki/FAQ#规则存储与动态规则数据源datasource (opens new window)
或者修改sentinel源码来实现
https://www.jianshu.com/p/9a6cf8634805 (opens new window) https://blog.csdn.net/lilizhou2008/article/details/97075236 (opens new window)
依赖
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
配置
server:
port: 8401
spring:
application:
name: cloudalibaba-sentinel-service
cloud:
nacos:
discovery:
server-addr: localhost:8848
sentinel:
transport:
dashboard: localhost:8080
port: 8719
# 持久化配置
datasource:
ds1:
nacos:
server-addr: localhost:8848
dataId: cloudalibaba-sentinel-service
groupId: DEFAULT_GROUP
data-type: json
rule-type: flow
management:
endpoints:
web:
exposure:
include: '*'
feign:
sentinel:
enabled: true
nacos添加配置
[
{
"resource": "/rateLimit/byUrl",
"limitApp": "default",
"grade": 1,
"count": 1,
"strategy": 0,
"controlBehavior": 0,
"clusterMode": false
}
]
- resource:资源名称;
- limitApp:来源应用;
- grade:阈值类型,0表示线程数,1表示QPS;
- count:单机阈值;
- strategy:流控模式,0表示直接,1表示关联,2表示链路;
- controlBehavior:流控效果,0表示快速失败,1表示Warm Up,2表示排队等待;
- clusterMode:是否集群。
sentinel成功读取到配置
重启服务后刷新sentinel控制台,发现没有流控规则,需要被访问过才会出现http://localhost:8401/rateLimit/byUrl,快速访问接口被限流,成功读取到nacos中的配置。
https://github.com/alibaba/Sentinel/wiki/在生产环境中使用-Sentinel (opens new window)
生产环境下一般更常用的是 push 模式的数据源。对于 push 模式的数据源,如远程配置中心(ZooKeeper, Nacos, Apollo等等),推送的操作不应由 Sentinel 客户端进行,而应该经控制台统一进行管理,直接进行推送,数据源仅负责获取配置中心推送的配置并更新到本地。因此推送规则正确做法应该是 配置中心控制台/Sentinel 控制台 → 配置中心 → Sentinel 数据源 → Sentinel,而不是经 Sentinel 数据源推送至配置中心。这样的流程就非常清晰了:
# Seata分布式事务
文档对新手不太友好,新版坑很多
版本选择,不对应会报错!!!
这里选择最新版
https://github.com/alibaba/spring-cloud-alibaba/wiki/版本说明 (opens new window)
https://seata.io/zh-cn/docs/overview/what-is-seata.html (opens new window)
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
# Seata术语
分布式事务处理过程的一ID + 三组件模型
Transaction ID XID:全局唯一的事务ID
- TC (Transaction Coordinator) - 事务协调者 维护全局和分支事务的状态,驱动全局事务提交或回滚。
- TM (Transaction Manager) - 事务管理器 定义全局事务的范围:开始全局事务、提交或回滚全局事务。
- RM (Resource Manager) - 资源管理器 管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
# 执行流程
https://www.bilibili.com/video/BV1X3411q7bC?from=search&seid=5043509781510973905 (opens new window)
- TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID;
- XID 在微服务调用链路的上下文中传播;
- RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖;
- TM 向 TC 发起针对 XID 的全局提交或回滚决议;
- TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。
# 环境搭建
下载(本次选用最新版1.4.2) https://github.com/seata/seata/releases (opens new window)
配置文件说明 https://seata.io/zh-cn/docs/user/configurations.html (opens new window) https://seata.io/zh-cn/docs/user/txgroup/transaction-group.html (opens new window) https://github.com/seata/seata-samples/blob/master/doc/quick-integration-with-spring-cloud.md (opens new window)
# file.conf
(仅仅是)seata-server的配置,内置的file.conf本地文件的配置方式,可以在register中配置使用本地file还是使用nacos、Apollo、zk等配置中心,如果使用分布式的配置中心,那本地的file就没用了
该配置用于指定TC的相关属性;如果使用注册中心也可以将配置添加到配置中心
新版本的file.conf不全,需要去github找配置
- service vgroupMapping.my_test_tx_group事务组名称
- 需要注意的是
service.vgroup_mapping
这个配置,在 Spring Cloud 中默认是${spring.application.name}-fescar-service-group
,可以通过指定application.properties
的spring.cloud.alibaba.seata.tx-service-group
这个属性覆盖,但是必须要和file.conf
中的一致,否则会提示no available server to connect
- 需要注意的是
- store mode="db"数据库存储
- 修改db的user和password
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
#thread factory for netty
thread-factory {
boss-thread-prefix = "NettyBoss"
worker-thread-prefix = "NettyServerNIOWorker"
server-executor-thread-prefix = "NettyServerBizHandler"
share-boss-worker = false
client-selector-thread-prefix = "NettyClientSelector"
client-selector-thread-size = 1
client-worker-thread-prefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
boss-thread-size = 1
#auto default pin or 8
worker-thread-size = 8
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#vgroup->rgroup
vgroupMapping.my_test_tx_group = "default"
#only support single node
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
}
client {
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
}
report.retry.count = 5
}
## transaction log store
store {
## store mode: file、db
mode = "db"
## file store
file {
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
max-branch-session-size = 16384
# globe session size , if exceeded throws exceptions
max-global-session-size = 512
# file buffer size , if exceeded allocate new buffer
file-write-buffer-cache-size = 16384
# when recover batch read size
session.reload.read_size = 100
# async, sync
flush-disk-mode = async
}
## database store
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "druid"
## mysql/oracle/h2/oceanbase etc.
db-type = "mysql"
url = "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true"
user = "root"
password = "root"
min-conn = 1
max-conn = 3
global.table = "global_table"
branch.table = "branch_table"
lock-table = "lock_table"
query-limit = 100
}
}
lock {
## the lock store mode: local、remote
mode = "remote"
local {
## store locks in user's database
}
remote {
## store locks in the seata's server
}
}
recovery {
committing-retry-delay = 30
asyn-committing-retry-delay = 30
rollbacking-retry-delay = 30
timeout-retry-delay = 30
}
transaction {
undo.data.validation = true
undo.log.serialization = "jackson"
}
## metrics settings
metrics {
enabled = false
registry-type = "compact"
# multi exporters use comma divided
exporter-list = "prometheus"
exporter-prometheus-port = 9898
}
# 数据库文件
https://github.com/seata/seata/blob/develop/script/server/db/mysql.sql (opens new window)
-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(128),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_status` (`status`),
KEY `idx_branch_id` (`branch_id`),
KEY `idx_xid_and_branch_id` (`xid` , `branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
CREATE TABLE IF NOT EXISTS `distributed_lock`
(
`lock_key` CHAR(20) NOT NULL,
`lock_value` VARCHAR(20) NOT NULL,
`expire` BIGINT,
primary key (`lock_key`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
# register.conf
该配置用于指定 TC 的注册中心和配置文件,默认都是 file; 如果使用其他的注册中心,要求 Seata-Server 也注册到该配置中心上
注册(中心) 和 (读取)配置的地方
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
}
- registry是注册中心的配置,将seata注册到注册中心
- config是配置中心,从注册中心中读取seata的配置,如果配置了config{type="nacos"}就是从nacos读取配置,之前的file.conf就没用了
这里是seata-server就是TC
启动nacos,启动seata,查看nacos控制台,seata成功注册到nacos
这里我们会创建三个服务,一个订单服务,一个库存服务,一个账户服务。
当用户下单时,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减下单商品的库存,再通过远程调用账户服务来扣减用户账户里面的余额,最后在订单服务中修改订单状态为已完成。
该操作跨越三个数据库,有两次远程调用,很明显会有分布式事务问题。
下订单--->扣库存--->减账户(余额)
建库
CREATE DATABASE seata_order;
CREATE DATABASE seata_storage;
CREATE DATABASE seata_account;
建表
USE seata_order;
CREATE TABLE t_order (
`id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
`user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
`product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
`count` INT(11) DEFAULT NULL COMMENT '数量',
`money` DECIMAL(11,0) DEFAULT NULL COMMENT '金额',
`status` INT(1) DEFAULT NULL COMMENT '订单状态:0:创建中;1:已完结'
) ENGINE=INNODB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
SELECT * FROM t_order;
USE seata_storage;
CREATE TABLE t_storage (
`id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
`product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
`total` INT(11) DEFAULT NULL COMMENT '总库存',
`used` INT(11) DEFAULT NULL COMMENT '已用库存',
`residue` INT(11) DEFAULT NULL COMMENT '剩余库存'
) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
INSERT INTO seata_storage.t_storage(`id`, `product_id`, `total`, `used`, `residue`)
VALUES ('1', '1', '100', '0', '100');
SELECT * FROM t_storage;
USE seata_account;
CREATE TABLE t_account (
`id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT 'id',
`user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
`total` DECIMAL(10,0) DEFAULT NULL COMMENT '总额度',
`used` DECIMAL(10,0) DEFAULT NULL COMMENT '已用余额',
`residue` DECIMAL(10,0) DEFAULT '0' COMMENT '剩余可用额度'
) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
INSERT INTO seata_account.t_account(`id`, `user_id`, `total`, `used`, `residue`) VALUES ('1', '1', '1000', '0', '1000');
SELECT * FROM t_account;
# undo_log
给每个业务数据库添加_undo_log_表
https://github.com/seata/seata/blob/develop/script/client/at/db/mysql.sql (opens new window)
-- 对于 AT 模式,您必须为您的业务数据库初始化此 sql。 seata 服务器不需要它。
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
# 编码
订单服务
依赖
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.8</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.3.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
配置
server:
port: 2001
spring:
application:
name: seata-order-service
cloud:
nacos:
discovery:
server-addr: localhost:8848
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata_order?useSSL=false
username: root
password: root
# 新版本,老版本是spring.cloud.alibaba.seata.tx-service-group
seata:
tx-service-group: my_test_tx_group
logging:
level:
io:
seata: info
mybatis-plus:
mapper-locations: classpath:mapper/*.xml
# 配置feign的超时,方便后续模拟远程调用失败
feign:
client:
config:
# 这里就是指的所有被加载的默认FeignClient实现的服务配置都生效
default:
connectTimeout: 1000
readTimeout: 2000
# 配置类
Seata 通过代理数据源的方式实现分支事务;MyBatis 和 JPA 都需要注入 io.seata.rm.datasource.DataSourceProxy
, 不同的是,MyBatis 还需要额外注入 org.apache.ibatis.session.SqlSessionFactory
@Configuration
public class DataSourceProxyConfig {
@Value("${mybatis-plus.mapper-locations}")
private String mapperLocations;
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource dataSource() {
return new DruidDataSource();
}
/**
* https://github.com/seata/seata/issues/3805
* 从 1.3 开始不鼓励自己注册 DataSourceProxy bean。如果您使用的是seata starter,则无需关心DataSourceProxy(starter会自动处理它),只需以旧方式注册和使用Datasource bean。
*/
/*@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}*/
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSource dataSource) throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(dataSource);
factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
factoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
return factoryBean.getObject();
}
}
启动类
@EnableDiscoveryClient
@EnableFeignClients
@MapperScan("com.starry.springcloud.mapper")
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class SeataOrderMainApp2001 {
public static void main(String[] args) {
SpringApplication.run(SeataOrderMainApp2001.class, args);
}
}
本地订单服务
public interface OrderService {
/**
* 创建订单
*/
void create(Order order);
}
远程库存服务
@FeignClient(value = "seata-storage-service")
public interface StorageService {
/**
* 扣减库存
*/
@PostMapping(value = "/storage/decrease")
CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);
}
远程余额服务
@FeignClient(value = "seata-account-service")
public interface AccountService {
/**
* 扣减账户余额
*/
@PostMapping("/account/decrease")
CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);
}
业务实现,涉及两次远程服务调用,在TM上加@GlobalTransactional
注解就能实现分布式事务
在业务的发起方的方法上使用**@GlobalTransactional**
开启全局事务,Seata 会将事务的 xid 通过拦截器添加到调用其他服务的请求中,实现分布式事务
@Service
@Slf4j
public class OrderServiceImpl implements OrderService
{
@Resource
private OrderMapper orderMapper;
@Resource
private StorageService storageService;
@Resource
private AccountService accountService;
/**
* 创建订单->调用库存服务扣减库存->调用账户服务扣减账户余额->修改订单状态
* 简单说:
* 下订单->减库存->减余额->改状态
*/
@Override
@GlobalTransactional(rollbackFor = Exception.class)
public void create(Order order) {
log.info("------->下单开始");
//本应用创建订单
orderMapper.create(order);
//远程调用库存服务扣减库存
log.info("------->order-service中扣减库存开始");
storageService.decrease(order.getProductId(),order.getCount());
log.info("------->order-service中扣减库存结束");
//远程调用账户服务扣减余额
log.info("------->order-service中扣减余额开始");
accountService.decrease(order.getUserId(),order.getMoney());
log.info("------->order-service中扣减余额结束");
//修改订单状态为已完成
log.info("------->order-service中修改订单状态开始");
orderMapper.update(order.getUserId(),0);
log.info("------->order-service中修改订单状态结束");
log.info("------->下单结束");
}
}
# 测试
注释@GlobalTransactional
,修改AccountServiceImpl,使线程睡眠,导致order调用超时
- 当库存和账户金额扣减后,订单状态并没有设置为已经完成,没有从零改为1
- 而且由于feign的重试机制,账户余额还有可能被多次扣减
- 数据不一致
开启@GlobalTransactional
,修改AccountServiceImpl,使线程睡眠,导致order调用超时
- 下单后数据库数据并没有任何改变
- rollback
再看执行流程
https://seata.io/zh-cn/docs/overview/what-is-seata.html (opens new window)
- TM 开启分布式事务(TM 向 TC 注册全局事务记录)
- 按业务场景、编排数据库、服务等事务内资源(RM 向 TC 汇报资源准备状态)
- TM 结束分布式事务,事务一阶段结束(TM 通知 TC 提交/回滚分布式事务)
- TC 汇总事务信息,决定分布式事务是提交还是回滚
- TC 通知所有 RM 提交/回滚 资源,事务二阶段结束
# AT模式如何做到对业务的无侵入
两阶段提交协议的演变:
- 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
- 二阶段:
- 提交异步化,非常快速地完成。
- 回滚通过一阶段的回滚日志进行反向补偿。
在一阶段,Seata 会拦截“业务 SQL”,
- 解析 SQL 语义,找到“业务 SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image”,
- 执行“业务 SQL”更新业务数据,在业务数据更新之后,
- 根据前镜像的结果,通过 主键 定位数据,其保存成“after image”,最后生成行锁。
before iamge 和 after image 组成了 undo_log,存在业务数据库
本地事务提交前还要前TC/seata-server数据库,申请行锁,行锁就是 表名+业务主键,行锁数据存在TC数据库
以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。
二阶段如是顺利提交的话, 因为“业务 SQL”在一阶段已经提交至数据库,所以Seata框架只需将一阶段保存的快照数据和行锁删掉,异步完成数据清理即可。
二阶段回滚: 二阶段如果是回滚的话,Seata 就需要回滚一阶段已经执行的“业务 SQL”,还原业务数据。 回滚方式便是用“before image”还原业务数据;但在还原前要首先要校验脏写,对比“数据库当前业务数据”和 “after image”, 如果两份数据完全一致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理。
debug使程序暂停,查看数据库
seata-server
branch_table
branch_id | xid | transaction_id | resource_group_id | resource_id | branch_type | status | client_id | application_data | gmt_create | gmt_modified |
---|---|---|---|---|---|---|---|---|---|---|
3567092540341715293 | 192.168.83.1:8091:3567092540341715291 | 3567092540341715291 | NULL | jdbc:mysql://localhost:3306/seata_order | AT | 0 | seata-order-service:127.0.0.1:61056 | NULL | 2022-03-17 16:14:27.996932 | 2022-03-17 16:14:27.996932 |
3567092540341715295 | 192.168.83.1:8091:3567092540341715291 | 3567092540341715291 | NULL | jdbc:mysql://localhost:3306/seata_storage | AT | 0 | seata-storage-service:127.0.0.1:61098 | NULL | 2022-03-17 16:14:28.340925 | 2022-03-17 16:14:28.340925 |
3567092540341715298 | 192.168.83.1:8091:3567092540341715291 | 3567092540341715291 | NULL | jdbc:mysql://localhost:3306/seata_account | AT | 0 | seata-account-service:127.0.0.1:61143 | NULL | 2022-03-17 16:14:28.614876 | 2022-03-17 16:14:28.614876 |
global_table
xid | transaction_id | status | application_id | transaction_service_group | transaction_name | timeout | begin_time | application_data | gmt_create | gmt_modified |
---|---|---|---|---|---|---|---|---|---|---|
192.168.83.1:8091:3567092540341715291 | 3567092540341715291 | 5 | seata-order-service | my_test_tx_group | create(com.starry.springcloud.pojo.Order) | 60000 | 1647504867751 | NULL | 2022-03-17 16:14:27 | 2022-03-17 16:15:00 |
lock_table
row_key | xid | transaction_id | branch_id | resource_id | table_name | pk | status | gmt_create | gmt_modified |
---|---|---|---|---|---|---|---|---|---|
jdbc:mysql://localhost:3306/seata_account^^t_account^^1 | 192.168.83.1:8091:3567092540341715291 | 3567092540341715291 | 3567092540341715298 | jdbc:mysql://localhost:3306/seata_account | t_account | 1 | 0 | 2022-03-17 16:14:28 | 2022-03-17 16:14:28 |
jdbc:mysql://localhost:3306/seata_order^^t_order^^18 | 192.168.83.1:8091:3567092540341715291 | 3567092540341715291 | 3567092540341715293 | jdbc:mysql://localhost:3306/seata_order | t_order | 18 | 0 | 2022-03-17 16:14:27 | 2022-03-17 16:14:27 |
jdbc:mysql://localhost:3306/seata_storage^^t_storage^^1 | 192.168.83.1:8091:3567092540341715291 | 3567092540341715291 | 3567092540341715295 | jdbc:mysql://localhost:3306/seata_storage | t_storage | 1 | 0 | 2022-03-17 16:14:28 | 2022-03-17 16:14:28 |
业务数据库
branch_id | xid | context | rollback_info | log_status | log_created | log_modified |
---|---|---|---|---|---|---|
3567092540341715298 | 192.168.83.1:8091:3567092540341715291 | serializer=jackson&compressorType=NONE | {"@class":"io.seata.rm.datasource.undo.BranchUndoLog","xid":"192.168.83.1:8091:3567092540341715291","branchId":3567092540341715298,"sqlUndoLogs":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.undo.SQLUndoLog","sqlType":"UPDATE","tableName":"t_account","beforeImage":{"@class":"io.seata.rm.datasource.sql.struct.TableRecords","tableName":"t_account","rows":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Row","fields":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"id","keyType":"PRIMARY_KEY","type":-5,"value":["java.lang.Long",1]},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"residue","keyType":"NULL","type":3,"value":["java.math.BigDecimal",1000]},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"used","keyType":"NULL","type":3,"value":["java.math.BigDecimal",0]}]]}]]},"afterImage":{"@class":"io.seata.rm.datasource.sql.struct.TableRecords","tableName":"t_account","rows":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Row","fields":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"id","keyType":"PRIMARY_KEY","type":-5,"value":["java.lang.Long",1]},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"residue","keyType":"NULL","type":3,"value":["java.math.BigDecimal",900]},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"used","keyType":"NULL","type":3,"value":["java.math.BigDecimal",100]}]]}]]}}]]} | 0 | 2022-03-17 16:14:28.642476 | 2022-03-17 16:14:28.642476 |
# 项目地址
https://notes-fanxing.oss-cn-hangzhou.aliyuncs.com/zip/cloudlearn.zip (opens new window) https://github.com/starriesWEB/springcloud_learning (opens new window)