Redis Cluster + Redis Reactive + Web Flux / Mono Cache
Apr 29th 2021
I got a challenge yesterday to implement Redis Cluster with Redis Reactive Method and using a Java Spring Mono including their Flux for cache operation
The challenge is simple, system need to be easy to use, only using @Cacheable annotation, without any manual operation while doing cache to redis.
First you need to add below library
<!-- REDIS CACHE MANAGER -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<!-- Spring Framework Caching Support -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
Create a custom serializer
package id.web.asw.filter;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.charset.StandardCharsets;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
import reactor.core.publisher.Mono;
/**
*
* @author aswzen
*/
@Slf4j
public class CacheCustomSerializer implements RedisSerializer<Object> {
private final ObjectMapper om;
private byte[] dataToSerialze;
public CacheCustomSerializer() {
this.om = new ObjectMapper();
}
@Override
public byte[] serialize(Object t) throws SerializationException {
try {
Mono obj = (Mono) t;
obj.subscribe(res -> {
try {
this.dataToSerialze = om.writeValueAsBytes(res);
} catch (JsonProcessingException ex) {
ex.printStackTrace();
log.debug(ex.getMessage());
}
});
return dataToSerialze;
} catch (Exception e) {
try {
return om.writeValueAsBytes(t);
} catch (JsonProcessingException ex) {
ex.printStackTrace();
log.debug(ex.getMessage());
}
}
return null;
}
@Override
public Object deserialize(byte[] bytes) throws SerializationException {
if (bytes == null) {
return null;
}
try {
String str = new String(bytes, StandardCharsets.UTF_8);
Object obj = om.readValue(str, Object.class);
return Mono.just(obj);
} catch (Exception e) {
e.printStackTrace();
log.debug(e.getMessage());
}
return null;
}
}
Create the cluster configuration
package id.web.asw.configuration;
import java.util.List;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
/**
*
* @author aswzen
*/
@Component
@PropertySource("classpath:application-default.yml")
@ConfigurationProperties(prefix = "spring.redis.cluster")
public class RedisClusterConfig {
/*
* spring.redis.cluster.nodes[0] = 127.0.0.1:7379 spring.redis.cluster.nodes[1]
* = 127.0.0.1:7380 ...
*/
private List<String> nodes;
/**
* spring.redis.cluster.max-redirects=3
*/
private int maxRedirects;
/**
* Get initial collection of known
* cluster nodes in format
* {@code host:port}.
*
* @return
*/
public List<String> getNodes() {
return nodes;
}
public void setNodes(List<String> nodes) {
this.nodes = nodes;
}
public int getMaxRedirects() {
return maxRedirects;
}
public void setMaxRedirects(int maxRedirects) {
this.maxRedirects = maxRedirects;
}
}
And finally the redis config itself
package id.web.asw.configuration;
import id.co.tri.bima.core.filter.CacheCustomSerializer;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.SocketOptions;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.resource.DefaultClientResources;
import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.interceptor.CacheErrorHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.format.support.DefaultFormattingConversionService;
/*
* Configured by moc.liamg@nezwsa.
* Hope works well.
*/
@Configuration
@EnableCaching
public class CachingConfig extends CachingConfigurerSupport {
private final org.slf4j.Logger LOGGER = LoggerFactory.getLogger("cache");
@Value("${redis.password}")
private String redisPassword;
@Autowired
private RedisClusterConfig clusterProperties;
@Override
public CacheErrorHandler errorHandler() {
CacheErrorHandler handler = new CacheErrorHandler() {
@Override
public void handleCacheGetError(RuntimeException re, Cache cache, Object o) {
LOGGER.error(cacheLoggerWriter("ERROR", "GET, "+cache.getName()+", " + o + "," + re));
}
@Override
public void handleCachePutError(RuntimeException re, Cache cache, Object o, Object o1) {
LOGGER.error(cacheLoggerWriter("ERROR", "PUT, "+cache.getName()+", " + o + "," + re));
}
@Override
public void handleCacheEvictError(RuntimeException re, Cache cache, Object o) {
LOGGER.error(cacheLoggerWriter("ERROR", "EVICT, "+cache.getName()+", " + o + "," + re));
}
@Override
public void handleCacheClearError(RuntimeException re, Cache cache) {
LOGGER.error(cacheLoggerWriter("ERROR", "CLEAR, "+cache.getName()+", "+ re));
}
};
return handler;
}
/* for reactive redis commands */
@Bean
@Primary
public ReactiveRedisTemplate<String, Object> reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {
Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
RedisSerializationContext.RedisSerializationContextBuilder<String, Object> builder = RedisSerializationContext.newSerializationContext(new StringRedisSerializer());
RedisSerializationContext<String, Object> context = builder.value(serializer).build();
return new ReactiveRedisTemplate<>(connectionFactory, context);
}
/* for basic redis commands */
@Bean
public RedisTemplate<Object, Object> redisTemplate() {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setKeySerializer(defaultRedisKeySerializer());
template.setHashKeySerializer(defaultRedisKeySerializer());
template.setConnectionFactory(redisConnectionFactory(redisConfiguration()));
return template;
}
@Bean
RedisClusterConfiguration redisConfiguration() {
RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration(clusterProperties.getNodes());
redisClusterConfiguration.setMaxRedirects(clusterProperties.getMaxRedirects());
redisClusterConfiguration.setPassword(redisPassword);
return redisClusterConfiguration;
}
//
@Bean
public LettuceConnectionFactory redisConnectionFactory(RedisClusterConfiguration redisConfiguration) {
StringBuilder logsSb = new StringBuilder();
logsSb.append("\r\n:: REDIS CONNECTION STARTING ::\r\n");
logsSb.append("Host\t\t: ").append(redisConfiguration.getClusterNodes()).append("\r\n");
SocketOptions so = SocketOptions.builder()
.keepAlive(false)
.tcpNoDelay(false)
.connectTimeout(Duration.ofSeconds(1))
.build();
/* POOLING CLIENT */
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setTestOnReturn(true);
//poolConfig.setMaxIdle(10);
//poolConfig.setMinIdle(1);
//poolConfig.setMaxWaitMillis(100);
//poolConfig.setMaxTotal(1000000);
poolConfig.setTestOnBorrow(true);
poolConfig.setTestWhileIdle(true);
logsSb.append("Status\t\t: Success\r\n");
cacheLoggerWriter("CACHE CONNECTION START", logsSb.toString());
ClusterTopologyRefreshOptions clusterTopologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
.enablePeriodicRefresh()
.enableAllAdaptiveRefreshTriggers()
.refreshPeriod(Duration.ofSeconds(10))
.build();
ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder()
.topologyRefreshOptions(clusterTopologyRefreshOptions)
.autoReconnect(true)
.pingBeforeActivateConnection(true)
.cancelCommandsOnReconnectFailure(true)
.socketOptions(so)
.disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS)
.suspendReconnectOnProtocolFailure(false)
.build();
ClientResources clientResources = DefaultClientResources.builder()
.ioThreadPoolSize(4)
.computationThreadPoolSize(4)
.build();
LettuceClientConfiguration lettuceClientConfiguration = LettucePoolingClientConfiguration.builder()
//.readFrom(ReadFrom.ANY) // dont put anything, danger
.commandTimeout(Duration.ofSeconds(1))
.poolConfig(poolConfig)
.clientResources(clientResources)
.clientOptions(clusterClientOptions).build();
return new LettuceConnectionFactory(redisConfiguration, lettuceClientConfiguration);
}
@Bean
@Autowired
@Override
public CacheManager cacheManager() {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
DefaultFormattingConversionService conversionService = new DefaultFormattingConversionService();
Set<String> cacheNames = new HashSet<>();
cacheNames.add("tenSecond");
cacheNames.add("oneMinute");
cacheNames.add("tenMinute");
cacheNames.add("oneHour");
cacheNames.add("twelveHour");
cacheNames.add("oneDay");
Map<String, RedisCacheConfiguration> configMap = new HashMap<>();
configMap.put("tenSecond", config
.entryTtl(Duration.ofSeconds(10))
.disableCachingNullValues()
.withConversionService(conversionService)
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(defaultRedisValueSerializer()))
);
configMap.put("oneMinute", config
.entryTtl(Duration.ofSeconds(60))
.disableCachingNullValues()
.withConversionService(conversionService)
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(defaultRedisValueSerializer()))
);
configMap.put("tenMinute", config
.entryTtl(Duration.ofSeconds(600))
.disableCachingNullValues()
.withConversionService(conversionService)
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(defaultRedisValueSerializer()))
);
configMap.put("oneHour", config
.entryTtl(Duration.ofSeconds(3600))
.disableCachingNullValues()
.withConversionService(conversionService)
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(defaultRedisValueSerializer()))
);
configMap.put("twelveHour", config
.entryTtl(Duration.ofSeconds(43200))
.disableCachingNullValues()
.withConversionService(conversionService)
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(defaultRedisValueSerializer()))
);
configMap.put("oneDay", config
.entryTtl(Duration.ofSeconds(86400))
.disableCachingNullValues()
.withConversionService(conversionService)
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(defaultRedisValueSerializer()))
);
LettuceConnectionFactory myLettuce = redisConnectionFactory(redisConfiguration());
RedisCacheManager cacheManager = RedisCacheManager.builder(myLettuce)
.initialCacheNames(cacheNames)
.withInitialCacheConfigurations(configMap)
.build();
cacheManager.afterPropertiesSet();
return cacheManager;
}
@Bean
public RedisSerializer<Object> defaultRedisKeySerializer() {
return new GenericJackson2JsonRedisSerializer();
}
@Bean
public RedisSerializer<Object> defaultRedisValueSerializer() {
return new CacheCustomSerializer();
}
public String cacheLoggerWriter(String title, String msg) {
StringBuilder sb = new StringBuilder();
sb.append("\r\n============START================").append("\r\n");
sb.append("Timestamp\t:").append(new Date()).append("\r\n");
sb.append("Title\t\t:").append(title).append("\r\n");
sb.append("Message\t\t:").append(msg).append("\r\n");
sb.append("============END=================").append("\r\n");
return sb.toString();
}
}