/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.flink;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.iotdb.flink.Event;
import org.apache.iotdb.flink.IoTSerializationSchema;
import org.apache.iotdb.flink.options.IoTDBSinkOptions;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IoTDBSink<IN>
extends RichSinkFunction<IN> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(IoTDBSink.class);
    private IoTDBSinkOptions options;
    private IoTSerializationSchema<IN> serializationSchema;
    private Map<String, IoTDBSinkOptions.TimeseriesOption> timeseriesOptionMap;
    private transient SessionPool pool;
    private transient ScheduledExecutorService scheduledExecutor;
    private int batchSize = 0;
    private int flushIntervalMs = 3000;
    private List<Event> batchList;
    private int sessionPoolSize = 2;

    public IoTDBSink(IoTDBSinkOptions options, IoTSerializationSchema<IN> schema) {
        this.options = options;
        this.serializationSchema = schema;
        this.batchList = new LinkedList<Event>();
        this.timeseriesOptionMap = new HashMap<String, IoTDBSinkOptions.TimeseriesOption>();
        for (IoTDBSinkOptions.TimeseriesOption timeseriesOption : options.getTimeseriesOptionList()) {
            this.timeseriesOptionMap.put(timeseriesOption.getPath(), timeseriesOption);
        }
    }

    public void open(Configuration parameters) throws Exception {
        this.initSession();
        this.initScheduler();
    }

    void initSession() {
        this.pool = new SessionPool(this.options.getHost(), this.options.getPort(), this.options.getUser(), this.options.getPassword(), this.sessionPoolSize);
    }

    void initScheduler() {
        if (this.batchSize > 0) {
            this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
            this.scheduledExecutor.scheduleAtFixedRate(() -> {
                try {
                    this.flush();
                }
                catch (Exception e) {
                    LOG.error("flush error", (Throwable)e);
                }
            }, this.flushIntervalMs, this.flushIntervalMs, TimeUnit.MILLISECONDS);
        }
    }

    void setSessionPool(SessionPool pool) {
        this.pool = pool;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void invoke(IN input, SinkFunction.Context context) throws Exception {
        Event event = this.serializationSchema.serialize(input);
        if (event == null) {
            return;
        }
        if (this.batchSize > 0) {
            List<Event> list = this.batchList;
            synchronized (list) {
                this.batchList.add(event);
                if (this.batchList.size() >= this.batchSize) {
                    this.flush();
                }
                return;
            }
        }
        this.convertText(event.getDevice(), event.getMeasurements(), event.getValues());
        this.pool.insertRecord(event.getDevice(), event.getTimestamp().longValue(), event.getMeasurements(), event.getTypes(), event.getValues());
        LOG.debug("send event successfully");
    }

    public IoTDBSink<IN> withBatchSize(int batchSize) {
        Preconditions.checkArgument((batchSize >= 0 ? 1 : 0) != 0);
        this.batchSize = batchSize;
        return this;
    }

    public IoTDBSink<IN> withFlushIntervalMs(int flushIntervalMs) {
        Preconditions.checkArgument((flushIntervalMs > 0 ? 1 : 0) != 0);
        this.flushIntervalMs = flushIntervalMs;
        return this;
    }

    public IoTDBSink<IN> withSessionPoolSize(int sessionPoolSize) {
        Preconditions.checkArgument((sessionPoolSize > 0 ? 1 : 0) != 0);
        this.sessionPoolSize = sessionPoolSize;
        return this;
    }

    public void close() {
        if (this.pool != null) {
            try {
                this.flush();
            }
            catch (Exception e) {
                LOG.error("flush error", (Throwable)e);
            }
            this.pool.close();
        }
        if (this.scheduledExecutor != null) {
            this.scheduledExecutor.shutdown();
        }
    }

    private void convertText(String device, List<String> measurements, List<Object> values) {
        if (device != null && measurements != null && values != null && measurements.size() == values.size()) {
            for (int i = 0; i < measurements.size(); ++i) {
                String measurement = device + "." + measurements.get(i);
                IoTDBSinkOptions.TimeseriesOption timeseriesOption = this.timeseriesOptionMap.get(measurement);
                if (timeseriesOption == null || !TSDataType.TEXT.equals((Object)timeseriesOption.getDataType())) continue;
                values.set(i, "'" + values.get(i) + "'");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flush() throws Exception {
        if (this.batchSize > 0) {
            List<Event> list = this.batchList;
            synchronized (list) {
                if (this.batchList.size() > 0) {
                    ArrayList<String> deviceIds = new ArrayList<String>();
                    ArrayList<Long> timestamps = new ArrayList<Long>();
                    ArrayList<List<String>> measurementsList = new ArrayList<List<String>>();
                    ArrayList<List<TSDataType>> typesList = new ArrayList<List<TSDataType>>();
                    ArrayList<List<Object>> valuesList = new ArrayList<List<Object>>();
                    for (Event event : this.batchList) {
                        this.convertText(event.getDevice(), event.getMeasurements(), event.getValues());
                        deviceIds.add(event.getDevice());
                        timestamps.add(event.getTimestamp());
                        measurementsList.add(event.getMeasurements());
                        typesList.add(event.getTypes());
                        valuesList.add(event.getValues());
                    }
                    this.pool.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
                    LOG.debug("send event successfully");
                    this.batchList.clear();
                }
            }
        }
    }
}

