0%

gateway自定义限流

Spring cloud Gateway自定义限流

背景

产品中使用spring cloud gateway,需要实现根据请求报文中的字段值来实现限流,比如在双11场景下,需要对微信渠道进行限流,每秒不能超过10笔。

方案

首先是限流的实现,强大的spring cloud gateway已经为我们提供了一套限流的接口,并且实现了一套基于redis的限流方案,它使用的是令牌桶算法

但是实现的相对简单,无法满足我们指定key-value进行限流,还需要我们动手进行一些改造。

其次是获取请求的报文体,和我们之前在同步阻塞的web框架中获取略有不同,构成如下:

img

spring cloud gateway采用异步非阻塞的框架,基于springboot2.0,大量使用了Reactor。后面在实现的时候需要针对这个特性来做相应的coding。

img

实现

限流

spring cloud gateway流程如下

img

http请求进入到网关主要经历如下几个阶段

  • DispatcherHandler 所有请求的调度器,将请求进行分发
  • RoutePredicateHandlerMapping 路由谓词匹配,通过内部的lookupRoute方法匹配路由,匹配到对应的路由后,返回该路由对应的FilteringWebHandler
  • FilteringWebHandler 负责组装filter,处理请求

我们要处理的限流实际上是gateway中的filter,事实上gateway已经提供了对应的实现RequestRateLimiterGatewayFilterFactory 其核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public static final String KEY_RESOLVER_KEY = "keyResolver";
private final RateLimiter defaultRateLimiter;
private final KeyResolver defaultKeyResolver;

public RequestRateLimiterGatewayFilterFactory(RateLimiter defaultRateLimiter, KeyResolver defaultKeyResolver) {
super(RequestRateLimiterGatewayFilterFactory.Config.class);
this.defaultRateLimiter = defaultRateLimiter;
this.defaultKeyResolver = defaultKeyResolver;
}

public KeyResolver getDefaultKeyResolver() {
return this.defaultKeyResolver;
}

public RateLimiter getDefaultRateLimiter() {
return this.defaultRateLimiter;
}

public GatewayFilter apply(RequestRateLimiterGatewayFilterFactory.Config config) {
KeyResolver resolver = config.keyResolver == null ? this.defaultKeyResolver : config.keyResolver;
RateLimiter<Object> limiter = config.rateLimiter == null ? this.defaultRateLimiter : config.rateLimiter;
return (exchange, chain) -> {
Route route = (Route)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
return resolver.resolve(exchange).flatMap((key) -> {
return limiter.isAllowed(route.getId(), key).flatMap((response) -> {
Iterator var4 = response.getHeaders().entrySet().iterator();

while(var4.hasNext()) {
Entry<String, String> header = (Entry)var4.next();
exchange.getResponse().getHeaders().add((String)header.getKey(), (String)header.getValue());
}

if (response.isAllowed()) {
return chain.filter(exchange);
} else {
exchange.getResponse().setStatusCode(config.getStatusCode());
return exchange.getResponse().setComplete();
}
});
});
};
}

其中两个核心的成员,一个是RateLimiter,主要提供限流的算法和实现,这个也是我们后面需要进行实现扩展的,一个是keyresolver,主要是提供进行限流的key,比如对ipaddress进行限流,但是官方提供的这种方式采用的是无差别的限流,只要是指定的key,对应的所有value都要进行对应的限流操作,这与我们要求的指定value进行限流还是有很大的差距。后面我们也需要对这部分进行修改。

基于Redis的RateLimiter

gateway官方提供了一个RateLimiter的实现,RedisRateLimiter,内部已将令牌桶等算法进行了实现,使用redis expire来进行限流的控制。这部分内容我们就不需要修改了。主要需要修改的地方是isAllowed方法。这个方法主要用来判断请求是否被限制。我们的修改也主要是针对该方法。

首先来看下如何添加一个限流的filter

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"filters":[
{
"name":"RequestRateLimiter",
"args":{
"redis-rate-limiter.burstCapacity":"1",
"key-resolver":"#{@remoteAddrResolver}",
"rate-limiter":"#{@redisRateLimiter}",
"redis-rate-limiter.replenishRate":"1"
}
}
]
}

其中name对应的是限流filter的名称,这个在前面已经提到,在FilteringWebHandler来识别处理,此处不能修改。里面对应的args包括:

redis-rate-limiter.burstCapacity bucket的总容量

key-resolver 采用SpEL添加指定的keyResolver

rate-limiter 同上,添加指定的rate-limiter实现,官方的gateway-sample里面的配置是redisRateLimiter,可以替换为自己的限流实现

redis-rate-limiter.replenishRate每秒刷新率,通俗来说就是每秒允许的请求数

下面我们要基于RedisRateLimiter修改isAllowed方法

1
2
3
4
5
6
7
8
9
public class CustomRedisRateLimiter extends AbstractRateLimiter<CustomRedisRateLimiter.Config> implements ApplicationContextAware {
@Deprecated
public static final String REPLENISH_RATE_KEY = "replenishRate";
@Deprecated
public static final String BURST_CAPACITY_KEY = "burstCapacity";
@Autowired
private StringRedisTemplate stringRedisTemplate;

public static final String CONFIGURATION_PROPERTY_NAME = "custom-redis-rate-limiter";

创建的自己的CustomRedisRatelimiter类,继承AbstractRateLimiter,其中CONFIGURATION_PROPERTY_NAME其实就对应上面的配置中的redis-rate-limiter,在加载配置的时候,会获取args对应key的值,在这里将名字定义为custom-redis-rate-limiter,后面在添加配置的时候,需要将对应的刷新率以及容量的参数改为custom-redis-rate-limiter开头。

修改isAllowed方法

首先对配置做一些调整,添加一个自定义的value字段,这个字段就是对应需要做限流字段的value,需要配置对应的keyResolver来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"filters":[
{
"name":"RequestRateLimiter",
"args":{
"custom-redis-rate-limiter.burstCapacity":"1",
"key-resolver":"#{@channelKeyResolver}",
"rate-limiter":"#{@customRedisRateLimiter}",
"custom-redis-rate-limiter.replenishRate":"1",
"custom-redis-rate-limiter.value":"MT"
}
}
]
}

其中,rate-limiter替换为自己实现的customRedisRateLimiter,添加了value字段,对应的isAllowed的代码片段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
List<String> keys = getKeys(id);
GatewayFilterDefinition gfDefinition = JSON.parseObject(stringRedisTemplate.opsForHash().get(RATE_CONFIG, routeId).toString(), GatewayFilterDefinition.class);
if(!gfDefinition.getArgs().get(RATE_VALUE).equals(id)){
return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
}

// The arguments to the LUA script. time() returns unixtime in seconds.
List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
Instant.now().getEpochSecond() + "", "1");
// allowed, tokens_left = redis.eval(SCRIPT, keys, args)
Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
.reduce(new ArrayList<Long>(), (longs, l) -> {
longs.addAll(l);
return longs;
}).map(results -> {
boolean allowed = results.get(0) == 1L;
Long tokensLeft = results.get(1);

Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft));

if (log.isDebugEnabled()) {
log.debug("response: " + response);
}
return response;
});

主要修改在第2行~第7行,因为我采用动态配置的方式,所有的RouteDefinition在添加和修改的时候,都会在redis中存储一份,此处通过routeId从Redis获取该路由对应的限流filter。获取我们自定义的value。isAllowed的第二个参数就是通过指定的keyResolver获取指定key的value,如果与预设的value一致,则,执行后续通过redis脚本进行限流的判断,如果不一致,则直接返回response,不做限流处理。

最后需要在通过@configuration中配置@bean让自定义的限流器生效

1
2
3
4
5
6
7
@Bean
@Primary
public CustomRedisRateLimiter customRedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate,
@Qualifier(CustomRedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> redisScript,
Validator validator) {
return new CustomRedisRateLimiter(redisTemplate, redisScript, validator);
}

一定要添加@primary。

keyResolver

上面在RequestRateLimiterGatewayFilterFactory中已经看到keyresolver的处理,我们如果需要限流的字段,只需要添加一个新的KeyResolver即可。

1
2
3
4
5
6
7
8
public class RemoteAddrKeyResolver implements KeyResolver {
public static final String BEAN_NAME = "remoteAddrKeyResolver";

@Override
public Mono<String> resolve(ServerWebExchange serverWebExchange) {
return Mono.just(serverWebExchange.getRequest().getRemoteAddress().getAddress().getHostAddress());
}
}

比如对客户端的ipAddress进行限流。

过滤器

通过上面的方式,我们已经可以通过对ipaddress,QueryParam等实现指定key以及value的限流了。但是要对RequestBody的内容进行限流,直接通过keyResolver是无法实现的。首先gateway基于Reactor,前面也提到,如果想要在keyResolver中直接获取requestbody的信息如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public Mono<String> resolve(ServerWebExchange exchange) {
//获取请求体
Flux<DataBuffer> body = exchange.getRequest().getBody();

AtomicReference<String> bodyRef = new AtomicReference<>();
body.subscribe(buffer -> {
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer());
DataBufferUtils.release(buffer);
bodyRef.set(charBuffer.toString());
});
//获取request body
return bodyRef.get();
}

运行时就会提示

1
2
> Only one connection receive subscriber allowed
>

其实这就是reactor的特性,整个数据流的内容如果想要截断获取,就必须重构一个ServerWebExchange,流向下一个节点。也就是保证整个流程的异步。所以基于这个特点。我们如果想要获取requestbody的内容。就需要在限流过滤器之前创建一个全局的过滤器(保证在所有的gatewayFilter之前执行),将RequestBody信息拿出来,然后再构建一个新的Exchange,并将这个body放在exchange的Attribute中。以后后面的filter获取

首先创建GetRequestBodyFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Component
public class GetRequestBodyFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest serverHttpRequest = exchange.getRequest();
String method = serverHttpRequest.getMethodValue();
//从请求里获取Post请求体
String bodyStr = resolveBodyFromRequest(serverHttpRequest);

//下面的将请求体再次封装写回到request里,传到下一级,否则,由于请求体已被消费,后续的服务将取不到值
URI uri = serverHttpRequest.getURI();
ServerHttpRequest request = serverHttpRequest.mutate().uri(uri).build();
DataBuffer bodyDataBuffer = stringBuffer(bodyStr);
Flux<DataBuffer> bodyFlux = Flux.just(bodyDataBuffer);

request = new ServerHttpRequestDecorator(request) {
@Override
public Flux<DataBuffer> getBody() {
return bodyFlux;
}
};
ServerWebExchange newExchange = exchange.mutate().request(request).build();
newExchange.getAttributes().put("bodyCache",bodyStr);
//封装request,传给下一级
return chain.filter(newExchange);
}

/**
* 从Flux<DataBuffer>中获取字符串的方法
*
* @return 请求体
*/
private String resolveBodyFromRequest(ServerHttpRequest serverHttpRequest) {
//获取请求体
Flux<DataBuffer> body = serverHttpRequest.getBody();

AtomicReference<String> bodyRef = new AtomicReference<>();
body.subscribe(buffer -> {
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer());
DataBufferUtils.release(buffer);
bodyRef.set(charBuffer.toString());
});
//获取request body
return bodyRef.get();
}

private DataBuffer stringBuffer(String value) {
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);

NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length);
buffer.write(bytes);
return buffer;
}

@Override
public int getOrder() {
return 0;
}
}

在fitler中我们可以看到,implements的是GlobalFiilter,里面通过exchange.getRequest().getBody()对body信息进行解析。并重新放入到新的exchange的attribute中。

如此一来,我们在后面的filter使用的keyResolver中就可以使用body信息了。

1
2
3
4
5
6
@Override
public Mono<String> resolve(ServerWebExchange exchange) {
String cache = (String) exchange.getAttribute("bodyCache");
SystemRequest systemRequest= JSON.parseObject(cache,SystemRequest.class);
return Mono.just(systemRequest.getSystemHeader().getChannel());
}

其实这个方法不光可以传递body的信息,同时还可以对请求数据进行统一的鉴权等处理。