Spring cloud Gateway自定义限流
背景
产品中使用spring cloud gateway,需要实现根据请求报文中的字段值来实现限流,比如在双11场景下,需要对微信渠道进行限流,每秒不能超过10笔。
方案
首先是限流的实现,强大的spring cloud gateway已经为我们提供了一套限流的接口,并且实现了一套基于redis的限流方案,它使用的是令牌桶算法

但是实现的相对简单,无法满足我们指定key-value进行限流,还需要我们动手进行一些改造。
其次是获取请求的报文体,和我们之前在同步阻塞的web框架中获取略有不同,构成如下:

spring cloud gateway采用异步非阻塞的框架,基于springboot2.0,大量使用了Reactor。后面在实现的时候需要针对这个特性来做相应的coding。

实现
限流
spring cloud gateway流程如下

http请求进入到网关主要经历如下几个阶段
- DispatcherHandler 所有请求的调度器,将请求进行分发
- RoutePredicateHandlerMapping 路由谓词匹配,通过内部的lookupRoute方法匹配路由,匹配到对应的路由后,返回该路由对应的FilteringWebHandler
- FilteringWebHandler 负责组装filter,处理请求
我们要处理的限流实际上是gateway中的filter,事实上gateway已经提供了对应的实现RequestRateLimiterGatewayFilterFactory
其核心代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public static final String KEY_RESOLVER_KEY = "keyResolver"; private final RateLimiter defaultRateLimiter; private final KeyResolver defaultKeyResolver;
public RequestRateLimiterGatewayFilterFactory(RateLimiter defaultRateLimiter, KeyResolver defaultKeyResolver) { super(RequestRateLimiterGatewayFilterFactory.Config.class); this.defaultRateLimiter = defaultRateLimiter; this.defaultKeyResolver = defaultKeyResolver; }
public KeyResolver getDefaultKeyResolver() { return this.defaultKeyResolver; }
public RateLimiter getDefaultRateLimiter() { return this.defaultRateLimiter; }
public GatewayFilter apply(RequestRateLimiterGatewayFilterFactory.Config config) { KeyResolver resolver = config.keyResolver == null ? this.defaultKeyResolver : config.keyResolver; RateLimiter<Object> limiter = config.rateLimiter == null ? this.defaultRateLimiter : config.rateLimiter; return (exchange, chain) -> { Route route = (Route)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR); return resolver.resolve(exchange).flatMap((key) -> { return limiter.isAllowed(route.getId(), key).flatMap((response) -> { Iterator var4 = response.getHeaders().entrySet().iterator();
while(var4.hasNext()) { Entry<String, String> header = (Entry)var4.next(); exchange.getResponse().getHeaders().add((String)header.getKey(), (String)header.getValue()); }
if (response.isAllowed()) { return chain.filter(exchange); } else { exchange.getResponse().setStatusCode(config.getStatusCode()); return exchange.getResponse().setComplete(); } }); }); }; }
|
其中两个核心的成员,一个是RateLimiter
,主要提供限流的算法和实现,这个也是我们后面需要进行实现扩展的,一个是keyresolver
,主要是提供进行限流的key,比如对ipaddress进行限流,但是官方提供的这种方式采用的是无差别的限流,只要是指定的key,对应的所有value都要进行对应的限流操作,这与我们要求的指定value进行限流还是有很大的差距。后面我们也需要对这部分进行修改。
基于Redis的RateLimiter
gateway官方提供了一个RateLimiter的实现,RedisRateLimiter
,内部已将令牌桶等算法进行了实现,使用redis expire来进行限流的控制。这部分内容我们就不需要修改了。主要需要修改的地方是isAllowed方法。这个方法主要用来判断请求是否被限制。我们的修改也主要是针对该方法。
首先来看下如何添加一个限流的filter
1 2 3 4 5 6 7 8 9 10 11 12 13
| { "filters":[ { "name":"RequestRateLimiter", "args":{ "redis-rate-limiter.burstCapacity":"1", "key-resolver":"#{@remoteAddrResolver}", "rate-limiter":"#{@redisRateLimiter}", "redis-rate-limiter.replenishRate":"1" } } ] }
|
其中name对应的是限流filter的名称,这个在前面已经提到,在FilteringWebHandler来识别处理,此处不能修改。里面对应的args包括:
redis-rate-limiter.burstCapacity
bucket的总容量
key-resolver
采用SpEL添加指定的keyResolver
rate-limiter
同上,添加指定的rate-limiter实现,官方的gateway-sample里面的配置是redisRateLimiter,可以替换为自己的限流实现
redis-rate-limiter.replenishRate
每秒刷新率,通俗来说就是每秒允许的请求数
下面我们要基于RedisRateLimiter修改isAllowed方法
1 2 3 4 5 6 7 8 9
| public class CustomRedisRateLimiter extends AbstractRateLimiter<CustomRedisRateLimiter.Config> implements ApplicationContextAware { @Deprecated public static final String REPLENISH_RATE_KEY = "replenishRate"; @Deprecated public static final String BURST_CAPACITY_KEY = "burstCapacity"; @Autowired private StringRedisTemplate stringRedisTemplate;
public static final String CONFIGURATION_PROPERTY_NAME = "custom-redis-rate-limiter";
|
创建的自己的CustomRedisRatelimiter类,继承AbstractRateLimiter,其中CONFIGURATION_PROPERTY_NAME其实就对应上面的配置中的redis-rate-limiter,在加载配置的时候,会获取args对应key的值,在这里将名字定义为custom-redis-rate-limiter,后面在添加配置的时候,需要将对应的刷新率以及容量的参数改为custom-redis-rate-limiter开头。
修改isAllowed方法
首先对配置做一些调整,添加一个自定义的value字段,这个字段就是对应需要做限流字段的value,需要配置对应的keyResolver来实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| { "filters":[ { "name":"RequestRateLimiter", "args":{ "custom-redis-rate-limiter.burstCapacity":"1", "key-resolver":"#{@channelKeyResolver}", "rate-limiter":"#{@customRedisRateLimiter}", "custom-redis-rate-limiter.replenishRate":"1", "custom-redis-rate-limiter.value":"MT" } } ] }
|
其中,rate-limiter替换为自己实现的customRedisRateLimiter,添加了value字段,对应的isAllowed的代码片段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| List<String> keys = getKeys(id); GatewayFilterDefinition gfDefinition = JSON.parseObject(stringRedisTemplate.opsForHash().get(RATE_CONFIG, routeId).toString(), GatewayFilterDefinition.class); if(!gfDefinition.getArgs().get(RATE_VALUE).equals(id)){ return Mono.just(new Response(true, getHeaders(routeConfig, -1L))); }
List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", Instant.now().getEpochSecond() + "", "1"); Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs); return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L))) .reduce(new ArrayList<Long>(), (longs, l) -> { longs.addAll(l); return longs; }).map(results -> { boolean allowed = results.get(0) == 1L; Long tokensLeft = results.get(1);
Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft));
if (log.isDebugEnabled()) { log.debug("response: " + response); } return response; });
|
主要修改在第2行~第7行,因为我采用动态配置的方式,所有的RouteDefinition在添加和修改的时候,都会在redis中存储一份,此处通过routeId从Redis获取该路由对应的限流filter。获取我们自定义的value。isAllowed的第二个参数就是通过指定的keyResolver获取指定key的value,如果与预设的value一致,则,执行后续通过redis脚本进行限流的判断,如果不一致,则直接返回response,不做限流处理。
最后需要在通过@configuration中配置@bean让自定义的限流器生效
1 2 3 4 5 6 7
| @Bean @Primary public CustomRedisRateLimiter customRedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate, @Qualifier(CustomRedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> redisScript, Validator validator) { return new CustomRedisRateLimiter(redisTemplate, redisScript, validator); }
|
一定要添加@primary。
keyResolver
上面在RequestRateLimiterGatewayFilterFactory
中已经看到keyresolver的处理,我们如果需要限流的字段,只需要添加一个新的KeyResolver即可。
1 2 3 4 5 6 7 8
| public class RemoteAddrKeyResolver implements KeyResolver { public static final String BEAN_NAME = "remoteAddrKeyResolver";
@Override public Mono<String> resolve(ServerWebExchange serverWebExchange) { return Mono.just(serverWebExchange.getRequest().getRemoteAddress().getAddress().getHostAddress()); } }
|
比如对客户端的ipAddress进行限流。
过滤器
通过上面的方式,我们已经可以通过对ipaddress,QueryParam等实现指定key以及value的限流了。但是要对RequestBody的内容进行限流,直接通过keyResolver是无法实现的。首先gateway基于Reactor,前面也提到,如果想要在keyResolver中直接获取requestbody的信息如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Override public Mono<String> resolve(ServerWebExchange exchange) { Flux<DataBuffer> body = exchange.getRequest().getBody();
AtomicReference<String> bodyRef = new AtomicReference<>(); body.subscribe(buffer -> { CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer()); DataBufferUtils.release(buffer); bodyRef.set(charBuffer.toString()); }); return bodyRef.get(); }
|
运行时就会提示
1 2
| > Only one connection receive subscriber allowed >
|
其实这就是reactor的特性,整个数据流的内容如果想要截断获取,就必须重构一个ServerWebExchange,流向下一个节点。也就是保证整个流程的异步。所以基于这个特点。我们如果想要获取requestbody的内容。就需要在限流过滤器之前创建一个全局的过滤器(保证在所有的gatewayFilter之前执行),将RequestBody信息拿出来,然后再构建一个新的Exchange,并将这个body放在exchange的Attribute中。以后后面的filter获取
首先创建GetRequestBodyFilter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| @Component public class GetRequestBodyFilter implements GlobalFilter, Ordered { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest serverHttpRequest = exchange.getRequest(); String method = serverHttpRequest.getMethodValue(); String bodyStr = resolveBodyFromRequest(serverHttpRequest);
URI uri = serverHttpRequest.getURI(); ServerHttpRequest request = serverHttpRequest.mutate().uri(uri).build(); DataBuffer bodyDataBuffer = stringBuffer(bodyStr); Flux<DataBuffer> bodyFlux = Flux.just(bodyDataBuffer);
request = new ServerHttpRequestDecorator(request) { @Override public Flux<DataBuffer> getBody() { return bodyFlux; } }; ServerWebExchange newExchange = exchange.mutate().request(request).build(); newExchange.getAttributes().put("bodyCache",bodyStr); return chain.filter(newExchange); }
private String resolveBodyFromRequest(ServerHttpRequest serverHttpRequest) { Flux<DataBuffer> body = serverHttpRequest.getBody();
AtomicReference<String> bodyRef = new AtomicReference<>(); body.subscribe(buffer -> { CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer()); DataBufferUtils.release(buffer); bodyRef.set(charBuffer.toString()); }); return bodyRef.get(); }
private DataBuffer stringBuffer(String value) { byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length); buffer.write(bytes); return buffer; }
@Override public int getOrder() { return 0; } }
|
在fitler中我们可以看到,implements的是GlobalFiilter,里面通过exchange.getRequest().getBody()对body信息进行解析。并重新放入到新的exchange的attribute中。
如此一来,我们在后面的filter使用的keyResolver中就可以使用body信息了。
1 2 3 4 5 6
| @Override public Mono<String> resolve(ServerWebExchange exchange) { String cache = (String) exchange.getAttribute("bodyCache"); SystemRequest systemRequest= JSON.parseObject(cache,SystemRequest.class); return Mono.just(systemRequest.getSystemHeader().getChannel()); }
|
其实这个方法不光可以传递body的信息,同时还可以对请求数据进行统一的鉴权等处理。