前言:
spring-data-redis使得Spring项目可以快速简单的通过RedisTemplate来操作Redis。而spring-boot-starter-data-redis更是让redis集成更加的方便。
SpringBoot如何与Redis集成,作为cache application.yml里如下配置:
1 2 3 4 5 6 7 8 9 10 11 spring: redis: host: 127.0 .0 .1 port: 6379 database: 0 timeout: 1000 pool: max-idle: 200 min-idle: 0 max-active: 200 max-wait: 1000
spring boot可以自动组装相关配置,注意其中使用到了jedis pool,用于提升性能,非必须。 通过以下的annotation加入方法名上,可以无侵入的使用cache。
@Cacheable 缓存
@CachePut 设置缓存
@CacheEvict 失效或更新缓存
@Caching 组合操作
以上annotation不做详细展开。
做到上面似乎已经可以了,但有一些问题需要我们来解决。
a.redis连接报错\超时怎么办?此时应该是可降级的。
b.使用连接池,连接不可用如何破?
下面贴一个比较成熟的做法,继承CachingConfigurerSupport
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 @Configuration @EnableCaching public class RedisConfig extends CachingConfigurerSupport { private static final long expire = 600 ; @Autowired private RedisProperties redisProperties; @Bean public JedisPoolConfig jedisPoolConfig () { JedisPoolConfig jedisPoolConfig = new JedisPoolConfig (); RedisProperties.Pool pool = redisProperties.getPool(); jedisPoolConfig.setMaxIdle(pool.getMaxIdle()); jedisPoolConfig.setMaxTotal(pool.getMaxActive()); jedisPoolConfig.setMinIdle(pool.getMinIdle()); jedisPoolConfig.setMaxWaitMillis(pool.getMaxWait()); jedisPoolConfig.setTestOnBorrow(true ); jedisPoolConfig.setTestWhileIdle(true ); return jedisPoolConfig; } @Bean public RedisConnectionFactory redisConnectionFactory (JedisPoolConfig jedisPoolConfig) { JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory (); jedisConnectionFactory.setHostName(redisProperties.getHost()); jedisConnectionFactory.setPort(redisProperties.getPort()); jedisConnectionFactory.setDatabase(redisProperties.getDatabase()); jedisConnectionFactory.setTimeout(redisProperties.getTimeout()); if (null != redisProperties.getPassword()) { jedisConnectionFactory.setPassword(redisProperties.getPassword()); } jedisConnectionFactory.setPoolConfig(jedisPoolConfig); return jedisConnectionFactory; } @Bean public CacheManager cacheManager (RedisTemplate redisTemplate) { RedisCacheManager cacheManager = new RedisCacheManager (redisTemplate); cacheManager.setDefaultExpiration(expire); cacheManager.setUsePrefix(true ); return cacheManager; } @Bean public RedisTemplate<String, String> redisTemplate (RedisConnectionFactory redisConnectionFactory) { StringRedisTemplate template = new StringRedisTemplate (redisConnectionFactory); template.setValueSerializer(getValueSerializer()); template.afterPropertiesSet(); return template; } private RedisSerializer getValueSerializer () { Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer (Object.class); ObjectMapper om = new ObjectMapper (); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); return jackson2JsonRedisSerializer; } @Bean @Override public KeyGenerator keyGenerator () { return new RequestKeyGenerator (); } @Bean @Override public CacheErrorHandler errorHandler () { return new CallbackCacheErrorHandler (); }
下面看一下CallbackCacheErrorHandler
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class CallbackCacheErrorHandler implements CacheErrorHandler { private static final Logger LOGGER = LoggerFactory.getLogger(CallbackCacheErrorHandler.class); @Override public void handleCacheGetError (RuntimeException exception, Cache cache, Object key) { LOGGER.error("cache get error, cacheName:{}, key:{}, msg:" , cache.getName(), key, exception); } @Override public void handleCachePutError (RuntimeException exception, Cache cache, Object key, Object value) { LOGGER.error("cache put error, cacheName:{}, key:{}, msg:" , cache.getName(), key, exception); } @Override public void handleCacheEvictError (RuntimeException exception, Cache cache, Object key) { LOGGER.error("cache evict error, cacheName:{}, key:{}, msg:" , cache.getName(), key, exception); } @Override public void handleCacheClearError (RuntimeException exception, Cache cache) { LOGGER.error("cache clear error, cacheName:{}, msg:" , cache.getName(), exception); } }
此处当报错的时候只进行了日志记录,当然如果有其他需求,都可以在这里扩展。自此,spring boot与redis集成大功告成,一切都是那么的完美。
关于RedisCacheManager是否setUsePrefix
的坑 首先,我们要知道是否使用prefix
的区别是什么? 区别如下:
使用prefix
的时候,redis cache的key都会默认添加上cacheName,用于区分不同的cache。
使用prefix
的时候,当清除或者失效所有的key的时候,使用的是key prefix*获取所有的key,然后依次清楚。而不使用prefix
的时候,需要清除或者失效所有key的时候,则是从一个维护了所有key的zset中获取的,这个zset通常叫做${cacheName}~keys
。
下面通过源代码来证实一下: RedisCache.java内RedisWriteThroughCallback
负责往redis设置缓存:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 static class RedisWriteThroughCallback extends AbstractRedisCacheCallback <byte []> { public RedisWriteThroughCallback (BinaryRedisCacheElement element, RedisCacheMetadata metadata) { super (element, metadata); } @Override public byte [] doInRedis(BinaryRedisCacheElement element, RedisConnection connection) throws DataAccessException { try { lock(connection); try { byte [] value = connection.get(element.getKeyBytes()); if (value != null ) { return value; } if (!isClusterConnection(connection)) { connection.watch(element.getKeyBytes()); connection.multi(); } value = element.get(); if (value.length == 0 ) { connection.del(element.getKeyBytes()); } else { connection.set(element.getKeyBytes(), value); processKeyExpiration(element, connection); maintainKnownKeys(element, connection); } if (!isClusterConnection(connection)) { connection.exec(); } return value; } catch (RuntimeException e) { if (!isClusterConnection(connection)) { connection.discard(); } throw e; } } finally { unlock(connection); } } }; protected void maintainKnownKeys (RedisCacheElement element, RedisConnection connection) { if (!element.hasKeyPrefix()) { connection.zAdd(cacheMetadata.getSetOfKnownKeysKey(), 0 , element.getKeyBytes()); if (!element.isEternal()) { connection.expire(cacheMetadata.getSetOfKnownKeysKey(), element.getTimeToLive()); } } }
从上面分析得知,设置缓存的时候有以下几步:
1.设置key-value
2.设置key的过期时间
3.维护key到已知key的zset列表
清理所有key的时候,是怎么操作的呢?
1 2 3 4 public void clear () { redisOperations.execute(cacheMetadata.usesKeyPrefix() ? new RedisCacheCleanByPrefixCallback (cacheMetadata) : new RedisCacheCleanByKeysCallback (cacheMetadata)); }
可以看出依据是否使用前缀,使用不同的回调方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 static class RedisCacheCleanByKeysCallback extends LockingRedisCacheCallback <Void> { private static final int PAGE_SIZE = 128 ; private final RedisCacheMetadata metadata; RedisCacheCleanByKeysCallback(RedisCacheMetadata metadata) { super (metadata); this .metadata = metadata; } @Override public Void doInLock (RedisConnection connection) { int offset = 0 ; boolean finished = false ; do { Set<byte []> keys = connection.zRange(metadata.getSetOfKnownKeysKey(), (offset) * PAGE_SIZE, (offset + 1 ) * PAGE_SIZE - 1 ); finished = keys.size() < PAGE_SIZE; offset++; if (!keys.isEmpty()) { connection.del(keys.toArray(new byte [keys.size()][])); } } while (!finished); connection.del(metadata.getSetOfKnownKeysKey()); return null ; } } static class RedisCacheCleanByPrefixCallback extends LockingRedisCacheCallback <Void> { private static final byte [] REMOVE_KEYS_BY_PATTERN_LUA = new StringRedisSerializer ().serialize( "local keys = redis.call('KEYS', ARGV[1]); local keysCount = table.getn(keys); if(keysCount > 0) then for _, key in ipairs(keys) do redis.call('del', key); end; end; return keysCount;" ); private static final byte [] WILD_CARD = new StringRedisSerializer ().serialize("*" ); private final RedisCacheMetadata metadata; public RedisCacheCleanByPrefixCallback (RedisCacheMetadata metadata) { super (metadata); this .metadata = metadata; } @Override public Void doInLock (RedisConnection connection) throws DataAccessException { byte [] prefixToUse = Arrays.copyOf(metadata.getKeyPrefix(), metadata.getKeyPrefix().length + WILD_CARD.length); System.arraycopy(WILD_CARD, 0 , prefixToUse, metadata.getKeyPrefix().length, WILD_CARD.length); if (isClusterConnection(connection)) { Set<byte []> keys = connection.keys(prefixToUse); if (!keys.isEmpty()) { connection.del(keys.toArray(new byte [keys.size()][])); } } else { connection.eval(REMOVE_KEYS_BY_PATTERN_LUA, ReturnType.INTEGER, 0 , prefixToUse); } return null ; } }
从以上源码可以看出使用prefix的区别。总结下,坑在哪儿,应该如何根据业务来选择。
关于Redis Cache默认使用lock的问题 在高并发下,发现spring redis cache的put效率并不高,经过排查发现put操作有lock机制,切lock时间无法更改。
如上RedisWriteThroughCallback
所示,有lock和unlock操作,其实就是往redis写一个key作为lock, 删除这个key作为unlock。这个操作在分布式系统中,可以保证其一致性,但是也损失了性能。尤其在仅作为缓存使用的场景,key对应的value具备幂等性,完全可以忽略。
源码重点在这个waitForLock
方法里:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 protected boolean waitForLock (RedisConnection connection) { boolean retry; boolean foundLock = false ; do { retry = false ; if (connection.exists(cacheMetadata.getCacheLockKey())) { foundLock = true ; try { Thread.sleep(WAIT_FOR_LOCK_TIMEOUT); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } retry = true ; } } while (retry); return foundLock; } protected void lock (RedisConnection connection) { waitForLock(connection); connection.set(cacheMetadata.getCacheLockKey(), "locked" .getBytes()); } protected void unlock (RedisConnection connection) { connection.del(cacheMetadata.getCacheLockKey()); }
可以看出每次加锁,如果lock已经存在的情况下,会额外sleep 300ms,这在高并发、高性能的缓存场景是极其低效 的。并且在极端情况下,unlock删除key没成功,将会导致所有key都无法设置或更新,并陷入死循环。spring内部也没有提供相关的行为覆盖机制,这是一个较大的坑。
Spring-Data-Redis 2.0 RC1的优化 官方DATAREDIS-481 注意到了Lock的优化,并对cache manager做了颠覆性的升级。 下面跟着我来看看,spring-data-redis 2.0之后如何使用注解式cache. 由于底层依赖的Jedis ,自从发布2.9.0版本之后,升级缓慢,目前也仅支持到2.8.x和3.x.x版本,所以Spring推荐使用lettuce .
先看application.yml里如何写:
1 2 3 4 5 6 7 8 9 10 11 12 spring: redis: host: 127.0 .0 .1 database: 0 port: 6379 timeout: 1000 lettuce: pool: max-active: 500 min-idle: 0 max-idle: 500 max-wait: 1000
开始使用lettuce了,jedis提示deprecated了。 pool提供的参数有限,如果想自己定制,参见如下设置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 @Configuration @EnableCaching public class RedisConfig extends CachingConfigurerSupport { @Autowired private RedisProperties redisProperties; private long expire = 600L ; @Bean public RedisConnectionFactory redisConnectionFactory () { GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig (); poolConfig.setMaxIdle(500 ); poolConfig.setMinIdle(0 ); poolConfig.setMaxTotal(500 ); poolConfig.setMaxWaitMillis(1000 ); poolConfig.setTestOnBorrow(true ); RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration (); redisStandaloneConfiguration.setHostName(redisProperties.getHost()); redisStandaloneConfiguration.setPort(redisProperties.getPort()); redisStandaloneConfiguration.setDatabase(redisProperties.getDatabase()); if (null != redisProperties.getPassword()){ redisStandaloneConfiguration.setPassword(RedisPassword.of(redisProperties.getPassword())); } LettuceClientConfiguration lettuceClientConfiguration = LettucePoolingClientConfiguration.builder() .commandTimeout(Duration.ofMillis(200 )).shutdownTimeout(Duration.ofMillis(200 )).poolConfig (poolConfig) .build(); LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory (redisStandaloneConfiguration, lettuceClientConfiguration); lettuceConnectionFactory.setValidateConnection(true ); return lettuceConnectionFactory; } @Bean public CacheManager cacheManager (RedisConnectionFactory redisConnectionFactory) { RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig().serializeValuesWith( RedisSerializationContext.SerializationPair.fromSerializer(getValueSerializer())) .entryTtl(Duration.ofSeconds (expire)).disableCachingNullValues(); RedisCacheManager cm = RedisCacheManager.builder(RedisCacheWriter.nonLockingRedisCacheWriter(redisConnectionFactory)).cacheDefaults(redisCacheConfiguration).transactionAware().build(); return cm; } private RedisSerializer getValueSerializer () { Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer (Object.class); ObjectMapper om = new ObjectMapper (); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); return jackson2JsonRedisSerializer; } @Bean @Override public CacheErrorHandler errorHandler () { return new RedisCacheErrorHandler (); } @Override public KeyGenerator keyGenerator () { return new MyKeyGenerator () }
从上面可以看出,基本操作是一致的,但是RedisCacheManager创建更加优雅,不在直接依赖redisTemplate。 关于是否使用prefix问题,RedisCacheConfiguration.defaultCacheConfig()
中代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private RedisCacheConfiguration (Duration ttl, Boolean cacheNullValues, Boolean usePrefix, CacheKeyPrefix keyPrefix, SerializationPair<String> keySerializationPair, SerializationPair<?> valueSerializationPair, ConversionService conversionService) { this .ttl = ttl; this .cacheNullValues = cacheNullValues; this .usePrefix = usePrefix; this .keyPrefix = keyPrefix; this .keySerializationPair = keySerializationPair; this .valueSerializationPair = (SerializationPair<Object>) valueSerializationPair; this .conversionService = conversionService; } public static RedisCacheConfiguration defaultCacheConfig () { DefaultFormattingConversionService conversionService = new DefaultFormattingConversionService (); registerDefaultConverters(conversionService); return new RedisCacheConfiguration (Duration.ZERO, true , true , CacheKeyPrefix.simple(), SerializationPair.fromSerializer(new StringRedisSerializer ()), SerializationPair.fromSerializer(new JdkSerializationRedisSerializer ()), conversionService); }
当然也是可以覆盖禁用的,使用disableKeyPrefix
, 但明确提出,你需要特别注意,不建议使用。
关于是否使用lock的问题,新版本也提供了可选方案。通过RedisCacheWriter
来实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 static RedisCacheWriter nonLockingRedisCacheWriter (RedisConnectionFactory connectionFactory) { Assert.notNull(connectionFactory, "ConnectionFactory must not be null!" ); return new DefaultRedisCacheWriter (connectionFactory); } static RedisCacheWriter lockingRedisCacheWriter (RedisConnectionFactory connectionFactory) { Assert.notNull(connectionFactory, "ConnectionFactory must not be null!" ); return new DefaultRedisCacheWriter (connectionFactory, Duration.ofMillis(50 )); }
可以看出lockingRedisCacheWriter将会有sleep 50ms来处理锁,nonlocking则没有加锁等待,给用户提供了更好的处理方案。
关于全部失效或者清理key的问题,2.0版本处理方案如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Override public void clean (String name, byte [] pattern) { Assert.notNull(name, "Name must not be null!" ); Assert.notNull(pattern, "Pattern must not be null!" ); execute(name, connection -> { boolean wasLocked = false ; try { if (isLockingCacheWriter()) { doLock(name, connection); wasLocked = true ; } byte [][] keys = Optional.ofNullable(connection.keys(pattern)).orElse(Collections.emptySet()) .toArray(new byte [0 ][]); if (keys.length > 0 ) { connection.del(keys); } } finally { if (wasLocked && isLockingCacheWriter()) { doUnlock(name, connection); } } return "OK" ; }); }
这里仍旧使用的是keys
命令,坑仍在。后续使用scan
操作也许是更好的选择,但最终还是要依据自己的业务需求来定制。
总结:
开源项目的坑无处不在,即使是spring 。 无论是什么版本,使用prefix
是更好的选择,也是趋势所在。 keys操作对性能的影响始终未能彻底消除,建议使用key expire机制来规避。(生产环境keys操作也是尽可能要避免的)。 redis缓存key的大小,无论是性能还是存储的影响都很大,强烈建议在业务允许范围内尽可能减小key的大小(比如使用MD5,有一定碰撞率)。