限制每个商家的每种类型的接口请求做限流。例如:同一商家每秒仅允许20个签约请求。当每秒有20个以上的请求时,它将提示“对接口进行签名的客户端请求数超过了限制”。
然后,作为下游系统,我们需要控制并发以防止无效请求。最常用的并发电流限制方案是使用redis / jedis。为了确保原子性,在这里,我使用Redis + LUA脚本进行控制。然后,对于服务提供商,当请求数量超过设置的限流阈值时,将直接返回错误代码/错误提示,并终止请求的处理。对于调用者,我们要做的是:当并发请求超过限制的阈值时,请延迟请求,而不是直接丢弃它。
如下RedisLimiter类,服务提供方使用limit方法实现限流,服务调用方使用limitWait方法实现限流等待(如需)
package jstudy.redislimit;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;/*** Redis+Lua实现高并发限流*/
@Slf4j
@Component
public class RedisLimiter {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;/*** 达到限流时,则等待,直到新的间隔。** @param key* @param limitCount* @param limitSecond*/public void limitWait(String key, int limitCount, int limitSecond) {boolean ok;//放行标志do {ok = limit(key, limitCount, limitSecond);log.info("放行标志={}", ok);if (!ok) {Long ttl = redisTemplate.getExpire(key, TimeUnit.MILLISECONDS);if (null != ttl && ttl > 0) {try {Thread.sleep(ttl);log.info("sleeped:{}", ttl);} catch (InterruptedException e) {e.printStackTrace();}}}} while (!ok);}/*** 限流方法 true-放行;false-限流** @param key* @param limitCount* @param limitSecond* @return*/public boolean limit(String key, int limitCount, int limitSecond) {List<String> keys = Collections.singletonList(key);String luaScript = buildLuaScript();RedisScript<Number> redisScript = new DefaultRedisScript<>(luaScript, Number.class);Number count = redisTemplate.execute(redisScript, keys, limitCount, limitSecond);log.info("Access try count is {} for key = {}", count, key);if (count != null && count.intValue() <= limitCount) {return true;//放行} else {return false;//限流
// throw new RuntimeException("You have been dragged into the blacklist");}}/*** 编写 redis Lua 限流脚本*/public String buildLuaScript() {StringBuilder lua = new StringBuilder();lua.append("local c");lua.append("\nc = redis.call('get',KEYS[1])");// 调用不超过最大值,则直接返回lua.append("\nif c and tonumber(c) > tonumber(ARGV[1]) then");lua.append("\nreturn c;");lua.append("\nend");// 执行计算器自加lua.append("\nc = redis.call('incr',KEYS[1])");lua.append("\nif tonumber(c) == 1 then");// 从第一次调用开始限流,设置对应键值的过期lua.append("\nredis.call('expire',KEYS[1],ARGV[2])");lua.append("\nend");lua.append("\nreturn c;");return lua.toString();}}
springboot自动注入的RedisTemplate是RedisTemplate<Object,Object>泛型, 上面class使用RedisTemplate<String, Object>,bean定义如下:
package jstudy.redislimit;import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;@Configuration
@EnableCaching // 开启缓存支持
public class RedisConfig extends CachingConfigurerSupport {/*** RedisTemplate配置** @param lettuceConnectionFactory* @return*/@Beanpublic RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {// 设置序列化Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);ObjectMapper om = new ObjectMapper();om.setVisibility(PropertyAccessor.ALL, Visibility.ANY);om.enableDefaultTyping(DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(om);// 配置redisTemplateRedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();redisTemplate.setConnectionFactory(lettuceConnectionFactory);RedisSerializer<?> stringSerializer = new StringRedisSerializer();redisTemplate.setKeySerializer(stringSerializer);// key序列化redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// value序列化redisTemplate.setHashKeySerializer(stringSerializer);// Hash key序列化redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);// Hash value序列化redisTemplate.afterPropertiesSet();return redisTemplate;}}
并发测试通过,如下是testcase:
package jstudy.redislimit;import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class RedisLimiterTest {@Autowiredprivate RedisLimiter redisLimiter;@Testpublic void testLimitWait() throws InterruptedException {ExecutorService pool = Executors.newCachedThreadPool();log.info("--------{}", redisTemplate.opsForValue().get("abc"));for (int j = 1; j <= 5; j++) {int i=j;pool.execute(() -> {Thread.currentThread().setName( Thread.currentThread().getName().replace("-","_"));redisLimiter.limitWait("abc", 3, 1);log.info(i + ":" + true + " ttl:" + redisTemplate.getExpire("abc", TimeUnit.MILLISECONDS));try {// 线程等待,模拟执行业务逻辑Thread.sleep(new Random().nextInt(100));} catch (InterruptedException e) {e.printStackTrace();}});}pool.shutdown();pool.awaitTermination(2,TimeUnit.SECONDS);}
}