package azkaban.executor;

import azkaban.database.AbstractJdbcLoader;
import azkaban.utils.FileIOUtils;
import azkaban.utils.GZIPUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import azkaban.utils.Props;
import azkaban.utils.PropsUtils;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;

/* loaded from: input_file:azkaban/executor/JdbcExecutorLoader.class */
public class JdbcExecutorLoader extends AbstractJdbcLoader implements ExecutorLoader {
    private static final Logger logger = Logger.getLogger(JdbcExecutorLoader.class);
    private AbstractJdbcLoader.EncodingType defaultEncodingType;

    /* loaded from: input_file:azkaban/executor/JdbcExecutorLoader$FetchActiveExecutableFlows.class */
    private static class FetchActiveExecutableFlows implements ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
        private static String FETCH_ACTIVE_EXECUTABLE_FLOW = "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, ax.host host, ax.port port, ax.update_time axUpdateTime FROM execution_flows ex INNER JOIN active_executing_flows ax ON ex.exec_id = ax.exec_id";

        private FetchActiveExecutableFlows() {
        }

        /* renamed from: handle, reason: merged with bridge method [inline-methods] */
        public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> m26handle(ResultSet resultSet) throws SQLException {
            if (!resultSet.next()) {
                return Collections.emptyMap();
            }
            HashMap hashMap = new HashMap();
            do {
                int i = resultSet.getInt(1);
                int i2 = resultSet.getInt(2);
                byte[] bytes = resultSet.getBytes(3);
                String string = resultSet.getString(4);
                int i3 = resultSet.getInt(5);
                long j = resultSet.getLong(6);
                if (bytes == null) {
                    hashMap.put(Integer.valueOf(i), null);
                } else {
                    try {
                        ExecutableFlow createExecutableFlowFromObject = ExecutableFlow.createExecutableFlowFromObject(AbstractJdbcLoader.EncodingType.fromInteger(i2) == AbstractJdbcLoader.EncodingType.GZIP ? JSONUtils.parseJSONFromString(GZIPUtils.unGzipString(bytes, "UTF-8")) : JSONUtils.parseJSONFromString(new String(bytes, "UTF-8")));
                        ExecutionReference executionReference = new ExecutionReference(i, string, i3);
                        executionReference.setUpdateTime(j);
                        hashMap.put(Integer.valueOf(i), new Pair(executionReference, createExecutableFlowFromObject));
                    } catch (IOException e) {
                        throw new SQLException("Error retrieving flow data " + i, e);
                    }
                }
            } while (resultSet.next());
            return hashMap;
        }
    }

    /* loaded from: input_file:azkaban/executor/JdbcExecutorLoader$FetchExecutableFlows.class */
    private static class FetchExecutableFlows implements ResultSetHandler<List<ExecutableFlow>> {
        private static String FETCH_BASE_EXECUTABLE_FLOW_QUERY = "SELECT exec_id, enc_type, flow_data FROM execution_flows ";
        private static String FETCH_EXECUTABLE_FLOW = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE exec_id=?";
        private static String FETCH_ALL_EXECUTABLE_FLOW_HISTORY = "SELECT exec_id, enc_type, flow_data FROM execution_flows ORDER BY exec_id DESC LIMIT ?, ?";
        private static String FETCH_EXECUTABLE_FLOW_HISTORY = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE project_id=? AND flow_id=? ORDER BY exec_id DESC LIMIT ?, ?";
        private static String FETCH_EXECUTABLE_FLOW_BY_STATUS = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE project_id=? AND flow_id=? AND status=? ORDER BY exec_id DESC LIMIT ?, ?";

        private FetchExecutableFlows() {
        }

        /* renamed from: handle, reason: merged with bridge method [inline-methods] */
        public List<ExecutableFlow> m28handle(ResultSet resultSet) throws SQLException {
            if (!resultSet.next()) {
                return Collections.emptyList();
            }
            ArrayList arrayList = new ArrayList();
            do {
                int i = resultSet.getInt(1);
                int i2 = resultSet.getInt(2);
                byte[] bytes = resultSet.getBytes(3);
                if (bytes != null) {
                    try {
                        arrayList.add(ExecutableFlow.createExecutableFlowFromObject(AbstractJdbcLoader.EncodingType.fromInteger(i2) == AbstractJdbcLoader.EncodingType.GZIP ? JSONUtils.parseJSONFromString(GZIPUtils.unGzipString(bytes, "UTF-8")) : JSONUtils.parseJSONFromString(new String(bytes, "UTF-8"))));
                    } catch (IOException e) {
                        throw new SQLException("Error retrieving flow data " + i, e);
                    }
                }
            } while (resultSet.next());
            return arrayList;
        }
    }

    /* loaded from: input_file:azkaban/executor/JdbcExecutorLoader$FetchExecutableJobAttachmentsHandler.class */
    private static class FetchExecutableJobAttachmentsHandler implements ResultSetHandler<String> {
        private static String FETCH_ATTACHMENTS_EXECUTABLE_NODE = "SELECT attachments FROM execution_jobs WHERE exec_id=? AND job_id=?";

        private FetchExecutableJobAttachmentsHandler() {
        }

        /* renamed from: handle, reason: merged with bridge method [inline-methods] */
        public String m30handle(ResultSet resultSet) throws SQLException {
            String str = null;
            if (resultSet.next()) {
                try {
                    byte[] bytes = resultSet.getBytes(1);
                    if (bytes != null) {
                        str = GZIPUtils.unGzipString(bytes, "UTF-8");
                    }
                } catch (IOException e) {
                    throw new SQLException("Error decoding job attachments", e);
                }
            }
            return str;
        }
    }

    /* loaded from: input_file:azkaban/executor/JdbcExecutorLoader$FetchExecutableJobHandler.class */
    private static class FetchExecutableJobHandler implements ResultSetHandler<List<ExecutableJobInfo>> {
        private static String FETCH_EXECUTABLE_NODE = "SELECT exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, attempt FROM execution_jobs WHERE exec_id=? AND job_id=? AND attempt_id=?";
        private static String FETCH_EXECUTABLE_NODE_ATTEMPTS = "SELECT exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, attempt FROM execution_jobs WHERE exec_id=? AND job_id=?";
        private static String FETCH_PROJECT_EXECUTABLE_NODE = "SELECT exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, attempt FROM execution_jobs WHERE project_id=? AND job_id=? ORDER BY exec_id DESC LIMIT ?, ? ";

        private FetchExecutableJobHandler() {
        }

        /* renamed from: handle, reason: merged with bridge method [inline-methods] */
        public List<ExecutableJobInfo> m32handle(ResultSet resultSet) throws SQLException {
            if (!resultSet.next()) {
                return Collections.emptyList();
            }
            ArrayList arrayList = new ArrayList();
            do {
                arrayList.add(new ExecutableJobInfo(resultSet.getInt(1), resultSet.getInt(2), resultSet.getInt(3), resultSet.getString(4), resultSet.getString(5), resultSet.getLong(6), resultSet.getLong(7), Status.fromInteger(resultSet.getInt(8)), resultSet.getInt(9)));
            } while (resultSet.next());
            return arrayList;
        }
    }

    /* loaded from: input_file:azkaban/executor/JdbcExecutorLoader$FetchExecutableJobPropsHandler.class */
    private static class FetchExecutableJobPropsHandler implements ResultSetHandler<Pair<Props, Props>> {
        private static String FETCH_OUTPUT_PARAM_EXECUTABLE_NODE = "SELECT output_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
        private static String FETCH_INPUT_PARAM_EXECUTABLE_NODE = "SELECT input_params FROM execution_jobs WHERE exec_id=? AND job_id=?";
        private static String FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE = "SELECT input_params, output_params FROM execution_jobs WHERE exec_id=? AND job_id=?";

        private FetchExecutableJobPropsHandler() {
        }

        /* renamed from: handle, reason: merged with bridge method [inline-methods] */
        public Pair<Props, Props> m34handle(ResultSet resultSet) throws SQLException {
            if (!resultSet.next()) {
                return new Pair<>(null, null);
            }
            if (resultSet.getMetaData().getColumnCount() <= 1) {
                byte[] bytes = resultSet.getBytes(1);
                Props props = null;
                if (bytes != null) {
                    try {
                        props = PropsUtils.fromHierarchicalMap((Map) JSONUtils.parseJSONFromString(GZIPUtils.unGzipString(bytes, "UTF-8")));
                    } catch (IOException e) {
                        throw new SQLException("Error decoding param data", e);
                    }
                }
                return new Pair<>(props, null);
            }
            byte[] bytes2 = resultSet.getBytes(1);
            byte[] bytes3 = resultSet.getBytes(2);
            Props props2 = null;
            Props props3 = null;
            if (bytes2 != null) {
                try {
                    props2 = PropsUtils.fromHierarchicalMap((Map) JSONUtils.parseJSONFromString(GZIPUtils.unGzipString(bytes2, "UTF-8")));
                } catch (IOException e2) {
                    throw new SQLException("Error decoding param data", e2);
                }
            }
            if (bytes3 != null) {
                props3 = PropsUtils.fromHierarchicalMap((Map) JSONUtils.parseJSONFromString(GZIPUtils.unGzipString(bytes3, "UTF-8")));
            }
            return new Pair<>(props2, props3);
        }
    }

    /* loaded from: input_file:azkaban/executor/JdbcExecutorLoader$FetchLogsHandler.class */
    private static class FetchLogsHandler implements ResultSetHandler<FileIOUtils.LogData> {
        private static String FETCH_LOGS = "SELECT exec_id, name, attempt, enc_type, start_byte, end_byte, log FROM execution_logs WHERE exec_id=? AND name=? AND attempt=? AND end_byte > ? AND start_byte <= ? ORDER BY start_byte";
        private int startByte;
        private int endByte;

        public FetchLogsHandler(int i, int i2) {
            this.startByte = i;
            this.endByte = i2;
        }

        /* renamed from: handle, reason: merged with bridge method [inline-methods] */
        public FileIOUtils.LogData m36handle(ResultSet resultSet) throws SQLException {
            if (!resultSet.next()) {
                return null;
            }
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            do {
                resultSet.getInt(3);
                AbstractJdbcLoader.EncodingType fromInteger = AbstractJdbcLoader.EncodingType.fromInteger(resultSet.getInt(4));
                int i = resultSet.getInt(5);
                int i2 = resultSet.getInt(6);
                byte[] bytes = resultSet.getBytes(7);
                int i3 = this.startByte > i ? this.startByte - i : 0;
                int i4 = this.endByte < i2 ? (this.endByte - i) - i3 : (i2 - i) - i3;
                try {
                    byte[] bArr = bytes;
                    if (fromInteger == AbstractJdbcLoader.EncodingType.GZIP) {
                        bArr = GZIPUtils.unGzipBytes(bytes);
                    }
                    byteArrayOutputStream.write(bArr, i3, i4);
                } catch (IOException e) {
                    throw new SQLException(e);
                }
            } while (resultSet.next());
            byte[] byteArray = byteArrayOutputStream.toByteArray();
            Pair<Integer, Integer> utf8Range = FileIOUtils.getUtf8Range(byteArray, 0, byteArray.length);
            return new FileIOUtils.LogData(this.startByte + utf8Range.getFirst().intValue(), utf8Range.getSecond().intValue(), new String(byteArray, utf8Range.getFirst().intValue(), utf8Range.getSecond().intValue()));
        }
    }

    /* loaded from: input_file:azkaban/executor/JdbcExecutorLoader$IntHandler.class */
    private static class IntHandler implements ResultSetHandler<Integer> {
        private static String NUM_EXECUTIONS = "SELECT COUNT(1) FROM execution_flows";
        private static String NUM_FLOW_EXECUTIONS = "SELECT COUNT(1) FROM execution_flows WHERE project_id=? AND flow_id=?";
        private static String NUM_JOB_EXECUTIONS = "SELECT COUNT(1) FROM execution_jobs WHERE project_id=? AND job_id=?";

        private IntHandler() {
        }

        /* renamed from: handle, reason: merged with bridge method [inline-methods] */
        public Integer m38handle(ResultSet resultSet) throws SQLException {
            if (resultSet.next()) {
                return Integer.valueOf(resultSet.getInt(1));
            }
            return 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:azkaban/executor/JdbcExecutorLoader$LastInsertID.class */
    public static class LastInsertID implements ResultSetHandler<Long> {
        private static String LAST_INSERT_ID = "SELECT LAST_INSERT_ID()";

        private LastInsertID() {
        }

        /* renamed from: handle, reason: merged with bridge method [inline-methods] */
        public Long m40handle(ResultSet resultSet) throws SQLException {
            if (resultSet.next()) {
                return Long.valueOf(resultSet.getLong(1));
            }
            return -1L;
        }
    }

    public JdbcExecutorLoader(Props props) {
        super(props);
        this.defaultEncodingType = AbstractJdbcLoader.EncodingType.GZIP;
    }

    public AbstractJdbcLoader.EncodingType getDefaultEncodingType() {
        return this.defaultEncodingType;
    }

    public void setDefaultEncodingType(AbstractJdbcLoader.EncodingType encodingType) {
        this.defaultEncodingType = encodingType;
    }

    @Override // azkaban.executor.ExecutorLoader
    public synchronized void uploadExecutableFlow(ExecutableFlow executableFlow) throws ExecutorManagerException {
        Connection connection = getConnection();
        try {
            try {
                uploadExecutableFlow(connection, executableFlow, this.defaultEncodingType);
                DbUtils.closeQuietly(connection);
            } catch (IOException e) {
                throw new ExecutorManagerException("Error uploading flow", e);
            }
        } catch (Throwable th) {
            DbUtils.closeQuietly(connection);
            throw th;
        }
    }

    private synchronized void uploadExecutableFlow(Connection connection, ExecutableFlow executableFlow, AbstractJdbcLoader.EncodingType encodingType) throws ExecutorManagerException, IOException {
        QueryRunner queryRunner = new QueryRunner();
        long currentTimeMillis = System.currentTimeMillis();
        try {
            executableFlow.setStatus(Status.PREPARING);
            queryRunner.update(connection, "INSERT INTO execution_flows (project_id, flow_id, version, status, submit_time, submit_user, update_time) values (?,?,?,?,?,?,?)", new Object[]{Integer.valueOf(executableFlow.getProjectId()), executableFlow.getFlowId(), Integer.valueOf(executableFlow.getVersion()), Integer.valueOf(Status.PREPARING.getNumVal()), Long.valueOf(currentTimeMillis), executableFlow.getSubmitUser(), Long.valueOf(currentTimeMillis)});
            connection.commit();
            long longValue = ((Long) queryRunner.query(connection, LastInsertID.LAST_INSERT_ID, new LastInsertID())).longValue();
            if (longValue == -1) {
                throw new ExecutorManagerException("Execution id is not properly created.");
            }
            logger.info("Flow given " + executableFlow.getFlowId() + " given id " + longValue);
            executableFlow.setExecutionId((int) longValue);
            updateExecutableFlow(connection, executableFlow, encodingType);
        } catch (SQLException e) {
            throw new ExecutorManagerException("Error creating execution.", e);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public void updateExecutableFlow(ExecutableFlow executableFlow) throws ExecutorManagerException {
        Connection connection = getConnection();
        try {
            updateExecutableFlow(connection, executableFlow, this.defaultEncodingType);
            DbUtils.closeQuietly(connection);
        } catch (Throwable th) {
            DbUtils.closeQuietly(connection);
            throw th;
        }
    }

    private void updateExecutableFlow(Connection connection, ExecutableFlow executableFlow, AbstractJdbcLoader.EncodingType encodingType) throws ExecutorManagerException {
        QueryRunner queryRunner = new QueryRunner();
        try {
            byte[] bytes = JSONUtils.toJSON(executableFlow.toObject()).getBytes("UTF-8");
            byte[] bArr = bytes;
            if (encodingType == AbstractJdbcLoader.EncodingType.GZIP) {
                bArr = GZIPUtils.gzipBytes(bytes);
            }
            try {
                queryRunner.update(connection, "UPDATE execution_flows SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? WHERE exec_id=?", new Object[]{Integer.valueOf(executableFlow.getStatus().getNumVal()), Long.valueOf(executableFlow.getUpdateTime()), Long.valueOf(executableFlow.getStartTime()), Long.valueOf(executableFlow.getEndTime()), Integer.valueOf(encodingType.getNumVal()), bArr, Integer.valueOf(executableFlow.getExecutionId())});
                connection.commit();
            } catch (SQLException e) {
                throw new ExecutorManagerException("Error updating flow.", e);
            }
        } catch (IOException e2) {
            throw new ExecutorManagerException("Error encoding the execution flow.");
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public ExecutableFlow fetchExecutableFlow(int i) throws ExecutorManagerException {
        try {
            return (ExecutableFlow) ((List) createQueryRunner().query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW, new FetchExecutableFlows(), new Object[]{Integer.valueOf(i)})).get(0);
        } catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching flow id " + i, e);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows() throws ExecutorManagerException {
        try {
            return (Map) createQueryRunner().query(FetchActiveExecutableFlows.FETCH_ACTIVE_EXECUTABLE_FLOW, new FetchActiveExecutableFlows());
        } catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching active flows", e);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public int fetchNumExecutableFlows() throws ExecutorManagerException {
        try {
            return ((Integer) createQueryRunner().query(IntHandler.NUM_EXECUTIONS, new IntHandler())).intValue();
        } catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching num executions", e);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public int fetchNumExecutableFlows(int i, String str) throws ExecutorManagerException {
        try {
            return ((Integer) createQueryRunner().query(IntHandler.NUM_FLOW_EXECUTIONS, new IntHandler(), new Object[]{Integer.valueOf(i), str})).intValue();
        } catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching num executions", e);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public int fetchNumExecutableNodes(int i, String str) throws ExecutorManagerException {
        try {
            return ((Integer) createQueryRunner().query(IntHandler.NUM_JOB_EXECUTIONS, new IntHandler(), new Object[]{Integer.valueOf(i), str})).intValue();
        } catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching num executions", e);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public List<ExecutableFlow> fetchFlowHistory(int i, String str, int i2, int i3) throws ExecutorManagerException {
        try {
            return (List) createQueryRunner().query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_HISTORY, new FetchExecutableFlows(), new Object[]{Integer.valueOf(i), str, Integer.valueOf(i2), Integer.valueOf(i3)});
        } catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching active flows", e);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public List<ExecutableFlow> fetchFlowHistory(int i, String str, int i2, int i3, Status status) throws ExecutorManagerException {
        try {
            return (List) createQueryRunner().query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS, new FetchExecutableFlows(), new Object[]{Integer.valueOf(i), str, Integer.valueOf(status.getNumVal()), Integer.valueOf(i2), Integer.valueOf(i3)});
        } catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching active flows", e);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public List<ExecutableFlow> fetchFlowHistory(int i, int i2) throws ExecutorManagerException {
        try {
            return (List) createQueryRunner().query(FetchExecutableFlows.FETCH_ALL_EXECUTABLE_FLOW_HISTORY, new FetchExecutableFlows(), new Object[]{Integer.valueOf(i), Integer.valueOf(i2)});
        } catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching active flows", e);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public List<ExecutableFlow> fetchFlowHistory(String str, String str2, String str3, int i, long j, long j2, int i2, int i3) throws ExecutorManagerException {
        String str4;
        String str5;
        String str6;
        String str7;
        String str8 = FetchExecutableFlows.FETCH_BASE_EXECUTABLE_FLOW_QUERY;
        ArrayList arrayList = new ArrayList();
        boolean z = true;
        if (str != null && !str.isEmpty()) {
            str8 = str8 + " ef JOIN projects p ON ef.project_id = p.id WHERE name LIKE ?";
            arrayList.add('%' + str + '%');
            z = false;
        }
        if (str2 != null && !str2.isEmpty()) {
            if (z) {
                str7 = str8 + " WHERE ";
                z = false;
            } else {
                str7 = str8 + " AND ";
            }
            str8 = str7 + " flow_id LIKE ?";
            arrayList.add('%' + str2 + '%');
        }
        if (str3 != null && !str3.isEmpty()) {
            if (z) {
                str6 = str8 + " WHERE ";
                z = false;
            } else {
                str6 = str8 + " AND ";
            }
            str8 = str6 + " submit_user LIKE ?";
            arrayList.add('%' + str3 + '%');
        }
        if (i != 0) {
            if (z) {
                str5 = str8 + " WHERE ";
                z = false;
            } else {
                str5 = str8 + " AND ";
            }
            str8 = str5 + " status = ?";
            arrayList.add(Integer.valueOf(i));
        }
        if (j > 0) {
            if (z) {
                str4 = str8 + " WHERE ";
                z = false;
            } else {
                str4 = str8 + " AND ";
            }
            str8 = str4 + " start_time > ?";
            arrayList.add(Long.valueOf(j));
        }
        if (j2 > 0) {
            str8 = (z ? str8 + " WHERE " : str8 + " AND ") + " end_time < ?";
            arrayList.add(Long.valueOf(j2));
        }
        if (i2 > -1 && i3 > 0) {
            str8 = str8 + "  ORDER BY exec_id DESC LIMIT ?, ?";
            arrayList.add(Integer.valueOf(i2));
            arrayList.add(Integer.valueOf(i3));
        }
        try {
            return (List) createQueryRunner().query(str8, new FetchExecutableFlows(), arrayList.toArray());
        } catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching active flows", e);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public void addActiveExecutableReference(ExecutionReference executionReference) throws ExecutorManagerException {
        try {
            createQueryRunner().update("INSERT INTO active_executing_flows (exec_id, host, port, update_time) values (?,?,?,?)", new Object[]{Integer.valueOf(executionReference.getExecId()), executionReference.getHost(), Integer.valueOf(executionReference.getPort()), Long.valueOf(executionReference.getUpdateTime())});
        } catch (SQLException e) {
            throw new ExecutorManagerException("Error updating active flow reference " + executionReference.getExecId(), e);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public void removeActiveExecutableReference(int i) throws ExecutorManagerException {
        try {
            createQueryRunner().update("DELETE FROM active_executing_flows WHERE exec_id=?", Integer.valueOf(i));
        } catch (SQLException e) {
            throw new ExecutorManagerException("Error deleting active flow reference " + i, e);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public boolean updateExecutableReference(int i, long j) throws ExecutorManagerException {
        try {
            return createQueryRunner().update("UPDATE active_executing_flows set update_time=? WHERE exec_id=?", new Object[]{Long.valueOf(j), Integer.valueOf(i)}) > 0;
        } catch (SQLException e) {
            throw new ExecutorManagerException("Error deleting active flow reference " + i, e);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public void uploadExecutableNode(ExecutableNode executableNode, Props props) throws ExecutorManagerException {
        byte[] bArr = null;
        if (props != null) {
            try {
                bArr = GZIPUtils.gzipString(JSONUtils.toJSON(PropsUtils.toHierarchicalMap(props)), "UTF-8");
            } catch (IOException e) {
                throw new ExecutorManagerException("Error encoding input params");
            }
        }
        ExecutableFlow executableFlow = executableNode.getExecutableFlow();
        String flowPath = executableNode.getParentFlow().getFlowPath();
        System.out.println("Uploading flowId " + flowPath);
        try {
            createQueryRunner().update("INSERT INTO execution_jobs (exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)", new Object[]{Integer.valueOf(executableFlow.getExecutionId()), Integer.valueOf(executableFlow.getProjectId()), Integer.valueOf(executableFlow.getVersion()), flowPath, executableNode.getId(), Long.valueOf(executableNode.getStartTime()), Long.valueOf(executableNode.getEndTime()), Integer.valueOf(executableNode.getStatus().getNumVal()), bArr, Integer.valueOf(executableNode.getAttempt())});
        } catch (SQLException e2) {
            throw new ExecutorManagerException("Error writing job " + executableNode.getId(), e2);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public void updateExecutableNode(ExecutableNode executableNode) throws ExecutorManagerException {
        byte[] bArr = null;
        Props outputProps = executableNode.getOutputProps();
        if (outputProps != null) {
            try {
                bArr = GZIPUtils.gzipString(JSONUtils.toJSON(PropsUtils.toHierarchicalMap(outputProps)), "UTF-8");
            } catch (IOException e) {
                throw new ExecutorManagerException("Error encoding input params");
            }
        }
        try {
            createQueryRunner().update("UPDATE execution_jobs SET start_time=?, end_time=?, status=?, output_params=? WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?", new Object[]{Long.valueOf(executableNode.getStartTime()), Long.valueOf(executableNode.getEndTime()), Integer.valueOf(executableNode.getStatus().getNumVal()), bArr, Integer.valueOf(executableNode.getExecutableFlow().getExecutionId()), executableNode.getParentFlow().getFlowPath(), executableNode.getId(), Integer.valueOf(executableNode.getAttempt())});
        } catch (SQLException e2) {
            throw new ExecutorManagerException("Error updating job " + executableNode.getId(), e2);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public List<ExecutableJobInfo> fetchJobInfoAttempts(int i, String str) throws ExecutorManagerException {
        try {
            List<ExecutableJobInfo> list = (List) createQueryRunner().query(FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE_ATTEMPTS, new FetchExecutableJobHandler(), new Object[]{Integer.valueOf(i), str});
            if (list == null) {
                return null;
            }
            if (list.isEmpty()) {
                return null;
            }
            return list;
        } catch (SQLException e) {
            throw new ExecutorManagerException("Error querying job info " + str, e);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public ExecutableJobInfo fetchJobInfo(int i, String str, int i2) throws ExecutorManagerException {
        try {
            List list = (List) createQueryRunner().query(FetchExecutableJobHandler.FETCH_EXECUTABLE_NODE, new FetchExecutableJobHandler(), new Object[]{Integer.valueOf(i), str, Integer.valueOf(i2)});
            if (list == null || list.isEmpty()) {
                return null;
            }
            return (ExecutableJobInfo) list.get(0);
        } catch (SQLException e) {
            throw new ExecutorManagerException("Error querying job info " + str, e);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public Props fetchExecutionJobInputProps(int i, String str) throws ExecutorManagerException {
        try {
            return (Props) ((Pair) createQueryRunner().query(FetchExecutableJobPropsHandler.FETCH_INPUT_PARAM_EXECUTABLE_NODE, new FetchExecutableJobPropsHandler(), new Object[]{Integer.valueOf(i), str})).getFirst();
        } catch (SQLException e) {
            throw new ExecutorManagerException("Error querying job params " + i + " " + str, e);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public Props fetchExecutionJobOutputProps(int i, String str) throws ExecutorManagerException {
        try {
            return (Props) ((Pair) createQueryRunner().query(FetchExecutableJobPropsHandler.FETCH_OUTPUT_PARAM_EXECUTABLE_NODE, new FetchExecutableJobPropsHandler(), new Object[]{Integer.valueOf(i), str})).getFirst();
        } catch (SQLException e) {
            throw new ExecutorManagerException("Error querying job params " + i + " " + str, e);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public Pair<Props, Props> fetchExecutionJobProps(int i, String str) throws ExecutorManagerException {
        try {
            return (Pair) createQueryRunner().query(FetchExecutableJobPropsHandler.FETCH_INPUT_OUTPUT_PARAM_EXECUTABLE_NODE, new FetchExecutableJobPropsHandler(), new Object[]{Integer.valueOf(i), str});
        } catch (SQLException e) {
            throw new ExecutorManagerException("Error querying job params " + i + " " + str, e);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public List<ExecutableJobInfo> fetchJobHistory(int i, String str, int i2, int i3) throws ExecutorManagerException {
        try {
            List<ExecutableJobInfo> list = (List) createQueryRunner().query(FetchExecutableJobHandler.FETCH_PROJECT_EXECUTABLE_NODE, new FetchExecutableJobHandler(), new Object[]{Integer.valueOf(i), str, Integer.valueOf(i2), Integer.valueOf(i3)});
            if (list == null) {
                return null;
            }
            if (list.isEmpty()) {
                return null;
            }
            return list;
        } catch (SQLException e) {
            throw new ExecutorManagerException("Error querying job info " + str, e);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public FileIOUtils.LogData fetchLogs(int i, String str, int i2, int i3, int i4) throws ExecutorManagerException {
        try {
            return (FileIOUtils.LogData) createQueryRunner().query(FetchLogsHandler.FETCH_LOGS, new FetchLogsHandler(i3, i4 + i3), new Object[]{Integer.valueOf(i), str, Integer.valueOf(i2), Integer.valueOf(i3), Integer.valueOf(i3 + i4)});
        } catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching logs " + i + " : " + str, e);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public List<Object> fetchAttachments(int i, String str, int i2) throws ExecutorManagerException {
        try {
            String str2 = (String) createQueryRunner().query(FetchExecutableJobAttachmentsHandler.FETCH_ATTACHMENTS_EXECUTABLE_NODE, new FetchExecutableJobAttachmentsHandler(), new Object[]{Integer.valueOf(i), str});
            if (str2 == null) {
                return null;
            }
            return (List) JSONUtils.parseJSONFromString(str2);
        } catch (IOException e) {
            throw new ExecutorManagerException("Error converting job attachments to JSON " + str, e);
        } catch (SQLException e2) {
            throw new ExecutorManagerException("Error query job attachments " + str, e2);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public void uploadLogFile(int i, String str, int i2, File... fileArr) throws ExecutorManagerException {
        Connection connection = getConnection();
        try {
            try {
                try {
                    uploadLogFile(connection, i, str, i2, fileArr, this.defaultEncodingType);
                    connection.commit();
                    DbUtils.closeQuietly(connection);
                } catch (SQLException e) {
                    throw new ExecutorManagerException("Error committing log", e);
                }
            } catch (IOException e2) {
                throw new ExecutorManagerException("Error committing log", e2);
            }
        } catch (Throwable th) {
            DbUtils.closeQuietly(connection);
            throw th;
        }
    }

    private void uploadLogFile(Connection connection, int i, String str, int i2, File[] fileArr, AbstractJdbcLoader.EncodingType encodingType) throws ExecutorManagerException, IOException {
        byte[] bArr = new byte[51200];
        int i3 = 0;
        int length = bArr.length;
        int i4 = 0;
        BufferedInputStream bufferedInputStream = null;
        try {
            for (File file : fileArr) {
                try {
                    try {
                        bufferedInputStream = new BufferedInputStream(new FileInputStream(file));
                        int read = bufferedInputStream.read(bArr, i3, length);
                        while (read >= 0) {
                            if (i3 + read == bArr.length) {
                                uploadLogPart(connection, i, str, i2, i4, i4 + bArr.length, encodingType, bArr, bArr.length);
                                i3 = 0;
                                length = bArr.length;
                                i4 += bArr.length;
                            } else {
                                i3 += read;
                                length = bArr.length - i3;
                            }
                            read = bufferedInputStream.read(bArr, i3, length);
                        }
                    } catch (IOException e) {
                        throw new ExecutorManagerException("Error chunking", e);
                    }
                } catch (SQLException e2) {
                    throw new ExecutorManagerException("Error writing log part.", e2);
                }
            }
            if (i3 > 0) {
                uploadLogPart(connection, i, str, i2, i4, i4 + i3, encodingType, bArr, i3);
            }
        } finally {
            IOUtils.closeQuietly(bufferedInputStream);
        }
    }

    private void uploadLogPart(Connection connection, int i, String str, int i2, int i3, int i4, AbstractJdbcLoader.EncodingType encodingType, byte[] bArr, int i5) throws SQLException, IOException {
        QueryRunner queryRunner = new QueryRunner();
        byte[] bArr2 = bArr;
        if (encodingType == AbstractJdbcLoader.EncodingType.GZIP) {
            bArr2 = GZIPUtils.gzipBytes(bArr2, 0, i5);
        } else if (i5 < bArr2.length) {
            bArr2 = Arrays.copyOf(bArr, i5);
        }
        queryRunner.update(connection, "INSERT INTO execution_logs (exec_id, name, attempt, enc_type, start_byte, end_byte, log, upload_time) VALUES (?,?,?,?,?,?,?,?)", new Object[]{Integer.valueOf(i), str, Integer.valueOf(i2), Integer.valueOf(encodingType.getNumVal()), Integer.valueOf(i3), Integer.valueOf(i3 + i5), bArr2, Long.valueOf(DateTime.now().getMillis())});
    }

    @Override // azkaban.executor.ExecutorLoader
    public void uploadAttachmentFile(ExecutableNode executableNode, File file) throws ExecutorManagerException {
        Connection connection = getConnection();
        try {
            try {
                uploadAttachmentFile(connection, executableNode, file, this.defaultEncodingType);
                connection.commit();
                DbUtils.closeQuietly(connection);
            } catch (IOException e) {
                throw new ExecutorManagerException("Error uploading attachments ", e);
            } catch (SQLException e2) {
                throw new ExecutorManagerException("Error committing attachments ", e2);
            }
        } catch (Throwable th) {
            DbUtils.closeQuietly(connection);
            throw th;
        }
    }

    private void uploadAttachmentFile(Connection connection, ExecutableNode executableNode, File file, AbstractJdbcLoader.EncodingType encodingType) throws SQLException, IOException {
        new QueryRunner().update(connection, "UPDATE execution_jobs SET attachments=? WHERE exec_id=? AND flow_id=? AND job_id=? AND attempt=?", new Object[]{GZIPUtils.gzipString(FileUtils.readFileToString(file), "UTF-8"), Integer.valueOf(executableNode.getExecutableFlow().getExecutionId()), executableNode.getParentFlow().getNestedId(), executableNode.getId(), Integer.valueOf(executableNode.getAttempt())});
    }

    private Connection getConnection() throws ExecutorManagerException {
        Connection connection = null;
        try {
            connection = super.getDBConnection(false);
            return connection;
        } catch (Exception e) {
            DbUtils.closeQuietly(connection);
            throw new ExecutorManagerException("Error getting DB connection.", e);
        }
    }

    @Override // azkaban.executor.ExecutorLoader
    public int removeExecutionLogsByTime(long j) throws ExecutorManagerException {
        try {
            return createQueryRunner().update("DELETE FROM execution_logs WHERE upload_time < ?", Long.valueOf(j));
        } catch (SQLException e) {
            e.printStackTrace();
            throw new ExecutorManagerException("Error deleting old execution_logs before " + j, e);
        }
    }
}
