/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.redis.source;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
import redis.clients.jedis.params.ScanParams;
import redis.clients.jedis.resps.ScanResult;

public class RedisSourceReader
extends AbstractSingleSplitReader<SeaTunnelRow> {
    private final RedisParameters redisParameters;
    private final SingleSplitReaderContext context;
    private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private RedisClient redisClient;

    public RedisSourceReader(RedisParameters redisParameters, SingleSplitReaderContext context, DeserializationSchema<SeaTunnelRow> deserializationSchema) {
        this.redisParameters = redisParameters;
        this.context = context;
        this.deserializationSchema = deserializationSchema;
    }

    public void open() throws Exception {
        this.redisClient = this.redisParameters.buildRedisClient();
    }

    public void close() throws IOException {
        if (Objects.nonNull(this.redisClient)) {
            this.redisClient.close();
        }
    }

    @Override
    public void internalPollNext(Collector<SeaTunnelRow> output) throws Exception {
        RedisDataType redisDataType = this.resolveScanType(this.redisParameters.getRedisDataType());
        String cursor = ScanParams.SCAN_POINTER_START;
        String keysPattern = this.redisParameters.getKeysPattern();
        int batchSize = this.redisParameters.getBatchSize();
        do {
            ScanResult<String> scanResult = this.redisClient.scanKeys(cursor, batchSize, keysPattern, redisDataType);
            cursor = scanResult.getCursor();
            List<String> keys = scanResult.getResult();
            this.pollNext(keys, redisDataType, output);
        } while (!ScanParams.SCAN_POINTER_START.equals(cursor));
        this.context.signalNoMoreElement();
    }

    private void pollNext(List<String> keys, RedisDataType dataType, Collector<SeaTunnelRow> output) throws IOException {
        if (CollectionUtils.isEmpty(keys)) {
            return;
        }
        if (RedisDataType.HASH.equals((Object)dataType)) {
            this.pollHashMapToNext(keys, output);
            return;
        }
        if (RedisDataType.STRING.equals((Object)dataType) || RedisDataType.KEY.equals((Object)dataType)) {
            this.pollStringToNext(keys, output);
            return;
        }
        if (RedisDataType.LIST.equals((Object)dataType)) {
            this.pollListToNext(keys, output);
            return;
        }
        if (RedisDataType.SET.equals((Object)dataType)) {
            this.pollSetToNext(keys, output);
            return;
        }
        if (RedisDataType.ZSET.equals((Object)dataType)) {
            this.pollZsetToNext(keys, output);
            return;
        }
        throw new RedisConnectorException((SeaTunnelErrorCode)CommonErrorCode.UNSUPPORTED_DATA_TYPE, "UnSupport redisDataType,only support string,list,hash,set,zset");
    }

    private void pollZsetToNext(List<String> keys, Collector<SeaTunnelRow> output) throws IOException {
        List<List<String>> zSetList = this.redisClient.batchGetZset(keys);
        for (List<String> values : zSetList) {
            for (String value : values) {
                this.pollValueToNext(value, output);
            }
        }
    }

    private void pollSetToNext(List<String> keys, Collector<SeaTunnelRow> output) throws IOException {
        List<Set<String>> setList = this.redisClient.batchGetSet(keys);
        for (Set<String> values : setList) {
            for (String value : values) {
                this.pollValueToNext(value, output);
            }
        }
    }

    private void pollListToNext(List<String> keys, Collector<SeaTunnelRow> output) throws IOException {
        List<List<String>> valueList = this.redisClient.batchGetList(keys);
        for (List<String> values : valueList) {
            for (String value : values) {
                this.pollValueToNext(value, output);
            }
        }
    }

    private void pollStringToNext(List<String> keys, Collector<SeaTunnelRow> output) throws IOException {
        List<String> values = this.redisClient.batchGetString(keys);
        for (String value : values) {
            this.pollValueToNext(value, output);
        }
    }

    private void pollValueToNext(String value, Collector<SeaTunnelRow> output) throws IOException {
        if (this.deserializationSchema == null) {
            output.collect((Object)new SeaTunnelRow(new Object[]{value}));
        } else {
            this.deserializationSchema.deserialize(value.getBytes(), output);
        }
    }

    private void pollHashMapToNext(List<String> keys, Collector<SeaTunnelRow> output) throws IOException {
        List<Map<String, String>> values = this.redisClient.batchGetHash(keys);
        if (this.deserializationSchema == null) {
            for (Map<String, String> value : values) {
                output.collect((Object)new SeaTunnelRow(new Object[]{JsonUtils.toJsonString(value)}));
            }
            return;
        }
        for (Map<String, String> recordsMap : values) {
            if (this.redisParameters.getHashKeyParseMode() == RedisConfig.HashKeyParseMode.KV) {
                this.deserializationSchema.deserialize(JsonUtils.toJsonString(recordsMap).getBytes(), output);
                continue;
            }
            SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new Object[]{JsonUtils.toJsonString(recordsMap)});
            output.collect((Object)seaTunnelRow);
        }
    }

    private RedisDataType resolveScanType(RedisDataType dataType) {
        if (RedisDataType.KEY.equals((Object)dataType)) {
            return RedisDataType.STRING;
        }
        return dataType;
    }
}

