Redis Cluster + Redis Reactive + Web Flux / Mono Cache

Apr 29th 2021

I got a challenge yesterday to implement Redis Cluster with Redis Reactive Method and using a Java Spring Mono including their Flux for cache operation

The challenge is simple, system need to be easy to use, only using @Cacheable annotation, without any manual operation while doing cache to redis.

First you need to add below library

<!-- REDIS CACHE MANAGER -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.data</groupId>
	<artifactId>spring-data-redis</artifactId>
</dependency>
<dependency>
	<groupId>org.apache.commons</groupId>
	<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
	<groupId>io.lettuce</groupId>
	<artifactId>lettuce-core</artifactId>
</dependency>
<dependency>
	<groupId>io.projectreactor</groupId>
	<artifactId>reactor-core</artifactId>
</dependency>


<!-- Spring Framework Caching Support -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-cache</artifactId>
</dependency>

Create a custom serializer 

package id.web.asw.filter;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.charset.StandardCharsets;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
import reactor.core.publisher.Mono;

/**
 *
 * @author aswzen
 */
@Slf4j
public class CacheCustomSerializer implements RedisSerializer<Object> {

    private final ObjectMapper om;
    private byte[] dataToSerialze;

    public CacheCustomSerializer() {
        this.om = new ObjectMapper();
    }

    @Override
    public byte[] serialize(Object t) throws SerializationException {
        try {
            Mono obj = (Mono) t;
            obj.subscribe(res -> { 
                try {
                    this.dataToSerialze = om.writeValueAsBytes(res);
                } catch (JsonProcessingException ex) {
                    ex.printStackTrace();
                    log.debug(ex.getMessage());
                }
            });
            return dataToSerialze;
        } catch (Exception e) {
            try {
                return om.writeValueAsBytes(t);
            } catch (JsonProcessingException ex) {
                ex.printStackTrace();
                log.debug(ex.getMessage());
            }
        }
        return null;
    }

    @Override
    public Object deserialize(byte[] bytes) throws SerializationException {
        if (bytes == null) {
            return null;
        }

        try {
            String str = new String(bytes, StandardCharsets.UTF_8);
            Object obj = om.readValue(str, Object.class);
            return Mono.just(obj);
        } catch (Exception e) {
            e.printStackTrace();
            log.debug(e.getMessage());
        }
        return null;
    }

}

Create the cluster configuration

package id.web.asw.configuration;

import java.util.List;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;

/**
 *
 * @author aswzen
 */
@Component
@PropertySource("classpath:application-default.yml")
@ConfigurationProperties(prefix = "spring.redis.cluster")
public class RedisClusterConfig {

    /*
    * spring.redis.cluster.nodes[0] = 127.0.0.1:7379 spring.redis.cluster.nodes[1]
    * = 127.0.0.1:7380 ...
     */
    private List<String> nodes;

    /**
     * spring.redis.cluster.max-redirects=3
     */
    private int maxRedirects;

    /**
     * Get initial collection of known
     * cluster nodes in format
     * {@code host:port}.
     *
     * @return
     */
    public List<String> getNodes() {
        return nodes;
    }

    public void setNodes(List<String> nodes) {
        this.nodes = nodes;
    }

    public int getMaxRedirects() {
        return maxRedirects;
    }

    public void setMaxRedirects(int maxRedirects) {
        this.maxRedirects = maxRedirects;
    }

}

And finally the redis config itself

package id.web.asw.configuration;

import id.co.tri.bima.core.filter.CacheCustomSerializer;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.SocketOptions;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.resource.DefaultClientResources;
import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.interceptor.CacheErrorHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.format.support.DefaultFormattingConversionService;

/*
* Configured by moc.liamg@nezwsa‎. 
* Hope works well.
 */
@Configuration
@EnableCaching
public class CachingConfig extends CachingConfigurerSupport {

    private final org.slf4j.Logger LOGGER = LoggerFactory.getLogger("cache");

    @Value("${redis.password}")
    private String redisPassword;

    @Autowired
    private RedisClusterConfig clusterProperties;

    @Override
    public CacheErrorHandler errorHandler() {
        CacheErrorHandler handler = new CacheErrorHandler() {
            @Override
            public void handleCacheGetError(RuntimeException re, Cache cache, Object o) {
                LOGGER.error(cacheLoggerWriter("ERROR", "GET, "+cache.getName()+", " + o + "," + re));
            }

            @Override
            public void handleCachePutError(RuntimeException re, Cache cache, Object o, Object o1) {
                LOGGER.error(cacheLoggerWriter("ERROR", "PUT, "+cache.getName()+", " + o + "," + re));
            }

            @Override
            public void handleCacheEvictError(RuntimeException re, Cache cache, Object o) {
                LOGGER.error(cacheLoggerWriter("ERROR", "EVICT, "+cache.getName()+", " + o + "," + re));
            }

            @Override
            public void handleCacheClearError(RuntimeException re, Cache cache) {
                LOGGER.error(cacheLoggerWriter("ERROR", "CLEAR, "+cache.getName()+", "+ re));
           }

        };
        return handler;
    }

    /* for reactive redis commands */
    @Bean
    @Primary
    public ReactiveRedisTemplate<String, Object> reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {
        Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
        RedisSerializationContext.RedisSerializationContextBuilder<String, Object> builder =  RedisSerializationContext.newSerializationContext(new StringRedisSerializer());
        RedisSerializationContext<String, Object> context = builder.value(serializer).build();
        return new ReactiveRedisTemplate<>(connectionFactory, context);
    }

    /* for basic redis commands */
    @Bean
    public RedisTemplate<Object, Object> redisTemplate() {
        RedisTemplate<Object, Object> template = new RedisTemplate<>();
        template.setKeySerializer(defaultRedisKeySerializer());
        template.setHashKeySerializer(defaultRedisKeySerializer());
        template.setConnectionFactory(redisConnectionFactory(redisConfiguration()));
        return template;
    }

    @Bean
    RedisClusterConfiguration redisConfiguration() {
        RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration(clusterProperties.getNodes());
        redisClusterConfiguration.setMaxRedirects(clusterProperties.getMaxRedirects());
        redisClusterConfiguration.setPassword(redisPassword);
        return redisClusterConfiguration;
    }
//
    @Bean
    public LettuceConnectionFactory redisConnectionFactory(RedisClusterConfiguration redisConfiguration) {
        StringBuilder logsSb = new StringBuilder();
        logsSb.append("\r\n:: REDIS CONNECTION STARTING ::\r\n");
        logsSb.append("Host\t\t: ").append(redisConfiguration.getClusterNodes()).append("\r\n");

        SocketOptions so = SocketOptions.builder()
            .keepAlive(false)
            .tcpNoDelay(false)
            .connectTimeout(Duration.ofSeconds(1))
            .build();

        /* POOLING CLIENT */
        GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
        poolConfig.setTestOnReturn(true);
        //poolConfig.setMaxIdle(10);
        //poolConfig.setMinIdle(1);
        //poolConfig.setMaxWaitMillis(100);
        //poolConfig.setMaxTotal(1000000);
        poolConfig.setTestOnBorrow(true);
        poolConfig.setTestWhileIdle(true);

        logsSb.append("Status\t\t: Success\r\n");
        cacheLoggerWriter("CACHE CONNECTION START", logsSb.toString());
  
        ClusterTopologyRefreshOptions clusterTopologyRefreshOptions =  ClusterTopologyRefreshOptions.builder()
            .enablePeriodicRefresh()
            .enableAllAdaptiveRefreshTriggers()
            .refreshPeriod(Duration.ofSeconds(10))
            .build();
             
        ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder()
            .topologyRefreshOptions(clusterTopologyRefreshOptions)
            .autoReconnect(true)
            .pingBeforeActivateConnection(true)
            .cancelCommandsOnReconnectFailure(true)
            .socketOptions(so)
            .disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS)
            .suspendReconnectOnProtocolFailure(false)
            .build();

        ClientResources clientResources = DefaultClientResources.builder()
            .ioThreadPoolSize(4)
            .computationThreadPoolSize(4)
            .build();

        LettuceClientConfiguration lettuceClientConfiguration = LettucePoolingClientConfiguration.builder()
            //.readFrom(ReadFrom.ANY) // dont put anything, danger
            .commandTimeout(Duration.ofSeconds(1))
            .poolConfig(poolConfig)
            .clientResources(clientResources)
            .clientOptions(clusterClientOptions).build();

        return new LettuceConnectionFactory(redisConfiguration, lettuceClientConfiguration);
    }

    @Bean
    @Autowired
    @Override
    public CacheManager cacheManager() {

        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
        DefaultFormattingConversionService conversionService = new DefaultFormattingConversionService();

        Set<String> cacheNames = new HashSet<>();
        cacheNames.add("tenSecond");
        cacheNames.add("oneMinute");
        cacheNames.add("tenMinute");
        cacheNames.add("oneHour");
        cacheNames.add("twelveHour");
        cacheNames.add("oneDay");

        Map<String, RedisCacheConfiguration> configMap = new HashMap<>();
        configMap.put("tenSecond", config
            .entryTtl(Duration.ofSeconds(10))
            .disableCachingNullValues()
            .withConversionService(conversionService)
            .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(defaultRedisValueSerializer()))
        );
        configMap.put("oneMinute", config
            .entryTtl(Duration.ofSeconds(60))
            .disableCachingNullValues()
            .withConversionService(conversionService)
            .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(defaultRedisValueSerializer()))
        );
        configMap.put("tenMinute", config
            .entryTtl(Duration.ofSeconds(600))
            .disableCachingNullValues()
            .withConversionService(conversionService)
            .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(defaultRedisValueSerializer()))
        );
        configMap.put("oneHour", config
            .entryTtl(Duration.ofSeconds(3600))
            .disableCachingNullValues()
            .withConversionService(conversionService)
            .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(defaultRedisValueSerializer()))
        );
        configMap.put("twelveHour", config
            .entryTtl(Duration.ofSeconds(43200))
            .disableCachingNullValues()
            .withConversionService(conversionService)
            .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(defaultRedisValueSerializer()))
        );
        configMap.put("oneDay", config
            .entryTtl(Duration.ofSeconds(86400))
            .disableCachingNullValues()
            .withConversionService(conversionService)
            .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(defaultRedisValueSerializer()))
        );

        LettuceConnectionFactory myLettuce = redisConnectionFactory(redisConfiguration());

        RedisCacheManager cacheManager = RedisCacheManager.builder(myLettuce)
            .initialCacheNames(cacheNames)
            .withInitialCacheConfigurations(configMap)
            .build();

        cacheManager.afterPropertiesSet();

        return cacheManager;
    }

    @Bean
    public RedisSerializer<Object> defaultRedisKeySerializer() {
        return new GenericJackson2JsonRedisSerializer();
    }

    @Bean
    public RedisSerializer<Object> defaultRedisValueSerializer() {
        return new CacheCustomSerializer();
    }

    public String cacheLoggerWriter(String title, String msg) {
        StringBuilder sb = new StringBuilder();
        sb.append("\r\n============START================").append("\r\n");
        sb.append("Timestamp\t:").append(new Date()).append("\r\n");
        sb.append("Title\t\t:").append(title).append("\r\n");
        sb.append("Message\t\t:").append(msg).append("\r\n");
        sb.append("============END=================").append("\r\n");
        return sb.toString();
    }
}