Unable to connect to Redis; nested exception is io.lettuce.core.RedisConnectionException using ReactiveRedisTemplate

29,885

Solution 1

i updated my RedisConfig class as follows :


import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisConfiguration;
import org.springframework.data.redis.connection.RedisNode;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisSentinelConfiguration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxDetails;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxLine;

import io.lettuce.core.RedisURI;
import io.pivotal.cfenv.core.CfEnv;

@Configuration
public class RedisConfig {

    CfEnv cfEnv = new CfEnv();
    String tag = "redis";
    String redisHost = cfEnv.findCredentialsByTag(tag).getHost();

    @Bean
    @Primary
    public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory(RedisConfiguration defaultRedisConfig) {
        LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
                .commandTimeout(Duration.ofMillis(60000)).build();
        return new LettuceConnectionFactory(defaultRedisConfig, clientConfig);
    }

    @Bean
    public RedisConfiguration defaultRedisConfig() {
        if (redisHost != null) {
//          RedisStandaloneConfiguration config = new RedisStandaloneConfiguration("127.0.0.1", 6379);
            RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
            String redisPort = cfEnv.findCredentialsByTag(tag).getPort();
            String redisPassword = cfEnv.findCredentialsByTag(tag).getPassword();
            config.setHostName(redisHost);
            config.setPassword(RedisPassword.of(redisPassword));
            config.setPort(Integer.parseInt(redisPort));
            config.setDatabase(2);
            return config;
        } else {
            RedisSentinelConfiguration config = new RedisSentinelConfiguration();
            String uri = cfEnv.findCredentialsByTag(tag).getUri();
            RedisURI redisURI = RedisURI.create(uri);
            config.master(redisURI.getSentinelMasterId());
            List<RedisNode> nodes = redisURI.getSentinels().stream()
                    .map(redisUri -> populateNode(redisUri.getHost(), redisUri.getPort())).collect(Collectors.toList());
            nodes.forEach(node -> config.addSentinel(node));
            config.setPassword(RedisPassword.of(redisURI.getPassword()));
            config.setDatabase(2);
            return config;
        }
    }

    @Bean
    public ReactiveRedisOperations<TaxDetails, TaxLine> reactiveRedisTemplate(
        ReactiveRedisConnectionFactory factory) {
        StringRedisSerializer keySerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer<TaxLine> valueSerializer = new Jackson2JsonRedisSerializer<>(
            TaxLine.class);
        Jackson2JsonRedisSerializer<TaxDetails> valueSerializer1 = new Jackson2JsonRedisSerializer<>(
                TaxDetails.class);
        RedisSerializationContext.RedisSerializationContextBuilder<TaxDetails, TaxLine> builder = RedisSerializationContext
            .newSerializationContext(keySerializer);
        RedisSerializationContext<TaxDetails, TaxLine> context = builder.key(valueSerializer1).value(valueSerializer).build();
        return new ReactiveRedisTemplate<>(factory, context);
    }

    private RedisNode populateNode(String host, Integer port) {
        return new RedisNode(host, port);
    }

}

dependencies for cfEnv:

            <groupId>io.pivotal.cfenv</groupId>
            <artifactId>java-cfenv-boot</artifactId>
            <version>2.1.1.RELEASE</version>
        </dependency>

Solution 2

I had similar problem with Redis running on AWS (EC2 instance). It works after:

sudo vi /etc/redis/redis.conf

  1. Comment line: bind 127.0.0.1 ::1
  2. Set the line protected-mode no
  3. Set the line supervised systemd
  4. sudo systemctl restart redis.service
  5. Check the AWS security groups just in case.

Solution 3

I use this RedisConfig.java and it works for me.

@Configuration
@ConfigurationProperties(prefix = "spring.redis")
@Setter
public class RedisConfig {

    private String host;
    private String password;

    @Bean
    @Primary
    public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory(RedisConfiguration defaultRedisConfig) {
        LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
                .useSsl().build();
        return new LettuceConnectionFactory(defaultRedisConfig, clientConfig);
    }

    @Bean
    public RedisConfiguration defaultRedisConfig() {
        RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
        config.setHostName(host);
        config.setPassword(RedisPassword.of(password));
        return config;
    }
}
Share:
29,885

Related videos on Youtube

Urandoor Shilpa
Author by

Urandoor Shilpa

Updated on July 09, 2022

Comments

  • Urandoor Shilpa
    Urandoor Shilpa almost 2 years

    I am new to Reactive Programming. i need to connect to Redis to save and get some data. The redis instance is present in cloud. Am using Lettuce Connection factory to establish the connection.

    when establishing the connection to redis, the request fails. Here is my Redis configuration class :

    package com.sap.slh.tax.attributes.determination.springwebfluxdemo.config;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
    import org.springframework.data.redis.connection.RedisPassword;
    import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
    import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
    import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
    import org.springframework.data.redis.core.ReactiveRedisOperations;
    import org.springframework.data.redis.core.ReactiveRedisTemplate;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.RedisSerializationContext;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    import org.springframework.scheduling.annotation.EnableAsync;
    
    import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxDetails;
    import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxLine;
    import com.sap.slh.tax.attributes.determination.springwebfluxdemo.util.JsonUtil;
    
    @Configuration
    @EnableAsync
    public class RedisConfig {
        private static final Logger log = LoggerFactory.getLogger(RedisConfig.class);
    
        @Value("${vcap.services.redis.credentials.hostname:10.11.241.101}")
        private String host;
    
        @Value("${vcap.services.redis.credentials.port:36516}")
        private int port;
    
        @Value("$vcap.services.redis.credentials.password:123456788")
        private String password;
    
        @Bean
        public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
            RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(host, port);
            redisStandaloneConfiguration.setPassword(RedisPassword.of(password));
            redisStandaloneConfiguration.setDatabase(0);
            log.error("Redis standalone configuration{}",JsonUtil.toJsonString(redisStandaloneConfiguration));
            LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder().build();
            LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(redisStandaloneConfiguration, clientConfig);
            lettuceConnectionFactory.afterPropertiesSet();
            return lettuceConnectionFactory;
    
        }
    
        @Bean
        ReactiveRedisOperations<TaxDetails, TaxLine> redisOperations(
                ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
            Jackson2JsonRedisSerializer<TaxDetails> serializer = new Jackson2JsonRedisSerializer<>(TaxDetails.class);
            Jackson2JsonRedisSerializer<TaxLine> serializer1 = new Jackson2JsonRedisSerializer<>(TaxLine.class);
            RedisSerializationContext.RedisSerializationContextBuilder<TaxDetails, TaxLine> builder = RedisSerializationContext
                    .newSerializationContext(new StringRedisSerializer());
            RedisSerializationContext<TaxDetails, TaxLine> context = builder.key(serializer).value(serializer1).build();
            ;
            return new ReactiveRedisTemplate<>(
                    reactiveRedisConnectionFactory, context);
        }
    }
    

    and here is my look up service class which actually communicates with redis during the request

    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.ReactiveRedisOperations;
    import org.springframework.stereotype.Service;
    
    import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.RedisRepo;
    import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxDetails;
    import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxLine;
    import com.sap.slh.tax.attributes.determination.springwebfluxdemo.util.JsonUtil;
    
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    @Service
    public class RedisTaxLineLookUpService {
        private static final Logger log = LoggerFactory.getLogger(RedisTaxLineLookUpService.class);
    
        @Autowired
        private ReactiveRedisOperations<TaxDetails, TaxLine> redisOperations;
    
        public Flux<TaxLine> get(TaxDetails taxDetails) {
    
            log.info("going to call redis to fetch tax lines{}", JsonUtil.toJsonString(taxDetails));
            return redisOperations.keys(taxDetails).flatMap(redisOperations.opsForValue()::get);
    
        }
    
        public Mono<RedisRepo> set(RedisRepo redisRepo) {
            log.info("going to call redis to save tax lines{}", JsonUtil.toJsonString(redisRepo.getTaxDetails()));
            return redisOperations.opsForValue().set(redisRepo.getTaxDetails(), redisRepo.getTaxLine())
                    .map(__ -> redisRepo);
        }
    
    }
    

    Stack trace :

    2020-03-26T16:27:54.513+0000 [APP/PROC/WEB/0] OUT org.springframework.data.redis.RedisConnectionFailureException: Unable to connect to Redis; nested exception is io.lettuce.core.RedisConnectionException: Unable to connect to 10.11.241.101:36516 | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$SharedConnection.getNativeConnection(LettuceConnectionFactory.java:1199) | Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: | Error has been observed at the following site(s): | |_ checkpoint ? Handler com.sap.slh.tax.attributes.determination.springwebfluxdemo.controller.TaxLinesDeterminationController#saveTaxLines(RedisRepo) [DispatcherHandler] | |_ checkpoint ? HTTP POST "/tax/lines/save/" [ExceptionHandlingWebHandler] | Stack trace: | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$SharedConnection.getNativeConnection(LettuceConnectionFactory.java:1199) | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$SharedConnection.getConnection(LettuceConnectionFactory.java:1178) | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getSharedReactiveConnection(LettuceConnectionFactory.java:952) | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getReactiveConnection(LettuceConnectionFactory.java:429) | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getReactiveConnection(LettuceConnectionFactory.java:94) | at org.springframework.data.redis.core.ReactiveRedisTemplate.lambda$doInConnection$0(ReactiveRedisTemplate.java:198) | at reactor.core.publisher.MonoSupplier.call(MonoSupplier.java:85) | at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:80) | at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55) | at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) | at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) | at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203) | at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:296) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:247) | at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:329) | at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:173) | at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92) | at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) | at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144) | at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103) | at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287) | at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:330) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:160) | at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) | at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) | at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) | at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:419) | at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:209) | at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:367) | at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:363) | at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:489) | at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90) | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)

    Any suggestions or answers would be highly helpful ! Thanks in Advance !