/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig.pen;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduceCounter;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.data.DataBag;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.ReadScalars;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.PlanWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ConfigurationValidator;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.pen.ExampleGenerator;
import org.apache.pig.pen.IllustratorAttacher;
import org.apache.pig.pen.util.LineageTracer;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class LocalMapReduceSimulator {
    private MapReduceLauncher launcher = new MapReduceLauncher();
    private Map<PhysicalOperator, PhysicalOperator> phyToMRMap = new HashMap<PhysicalOperator, PhysicalOperator>();

    public void launchPig(PhysicalPlan php, Map<LOLoad, DataBag> baseData, LineageTracer lineage, IllustratorAttacher attacher, ExampleGenerator eg, PigContext pc) throws PigException, IOException, InterruptedException {
        this.phyToMRMap.clear();
        MROperPlan mrp = this.launcher.compile(php, pc);
        ConfigurationValidator.validatePigProperties(pc.getProperties());
        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
        int numMRJobsCompl = 0;
        ArrayList<Pair<PigNullableWritable, Writable>> intermediateData = new ArrayList<Pair<PigNullableWritable, Writable>>();
        Map<Job, MapReduceOper> jobToMroMap = jcc.getJobMroMap();
        HashMap<String, DataBag> output = new HashMap<String, DataBag>();
        ArrayList emptyInpTargets = new ArrayList();
        while (mrp.size() != 0) {
            JobControl jc = jcc.compile(mrp, "Illustrator");
            if (jc == null) {
                throw new ExecException("Native execution is not supported");
            }
            ArrayList jobs = jc.getWaitingJobs();
            for (Job job : jobs) {
                DataBag input;
                JobConf jobConf = job.getJobConf();
                FileLocalizer.setInitialized(false);
                ArrayList inpTargets = (ArrayList)ObjectSerializer.deserialize(jobConf.get("pig.inpTargets"));
                intermediateData.clear();
                MapReduceOper mro = jobToMroMap.get(job);
                PigSplit split = null;
                LinkedList<POStore> stores = null;
                PhysicalOperator pack = null;
                if (!mro.mapPlan.isEmpty()) {
                    attacher.revisit(mro.mapPlan);
                }
                if (!mro.reducePlan.isEmpty()) {
                    attacher.revisit(mro.reducePlan);
                    pack = (PhysicalOperator)mro.reducePlan.getRoots().get(0);
                }
                LinkedList<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class);
                if (!mro.mapPlan.isEmpty()) {
                    stores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class);
                }
                if (!mro.reducePlan.isEmpty()) {
                    if (stores == null) {
                        stores = PlanHelper.getPhysicalOperators(mro.reducePlan, POStore.class);
                    } else {
                        stores.addAll(PlanHelper.getPhysicalOperators(mro.reducePlan, POStore.class));
                    }
                }
                for (POStore store : stores) {
                    output.put(store.getSFile().getFileName(), attacher.getDataMap().get(store));
                }
                OutputAttacher oa = new OutputAttacher(mro.mapPlan, output);
                oa.visit();
                if (!mro.reducePlan.isEmpty()) {
                    oa = new OutputAttacher(mro.reducePlan, output);
                    oa.visit();
                }
                int index = 0;
                for (POLoad ld : lds) {
                    input = output.get(ld.getLFile().getFileName());
                    if (input == null && baseData != null) {
                        for (LOLoad lo : baseData.keySet()) {
                            if (!lo.getSchemaFile().equals(ld.getLFile().getFileName())) continue;
                            input = baseData.get(lo);
                            break;
                        }
                    }
                    if (input == null) continue;
                    mro.mapPlan.remove(ld);
                }
                for (POLoad ld : lds) {
                    Mapper.Context context;
                    PigMapBase map;
                    input = output.get(ld.getLFile().getFileName());
                    if (input == null && baseData != null && input == null && baseData != null) {
                        for (LOLoad lo : baseData.keySet()) {
                            if (!lo.getSchemaFile().equals(ld.getLFile().getFileName())) continue;
                            input = baseData.get(lo);
                            break;
                        }
                    }
                    boolean needFileInput = input == null;
                    split = new PigSplit(null, index, needFileInput ? emptyInpTargets : (ArrayList)inpTargets.get(index), 0);
                    ++index;
                    if (mro.reducePlan.isEmpty()) {
                        map = new PigMapOnly.Map();
                        context = map.getIllustratorContext((Configuration)jobConf, input, intermediateData, split);
                        if (mro.isCounterOperation()) {
                            if (mro.isRowNumber()) {
                                map = new PigMapReduceCounter.PigMapCounter();
                            }
                            context = ((PigMapReduceCounter.PigMapCounter)map).getIllustratorContext((Configuration)jobConf, input, intermediateData, split);
                        }
                        ((PigMapBase)map).setMapPlan(mro.mapPlan);
                        map.run(context);
                        continue;
                    }
                    map = "true".equals(jobConf.get("pig.usercomparator")) ? new PigGenericMapReduce.MapWithComparator() : (!"".equals(jobConf.get("pig.keyDistFile", "")) ? new PigGenericMapReduce.MapWithPartitionIndex() : new PigGenericMapReduce.Map());
                    context = ((PigMapBase)map).getIllustratorContext((Configuration)jobConf, input, intermediateData, split);
                    ((PigMapBase)map).setMapPlan(mro.mapPlan);
                    map.run(context);
                }
                if (!mro.reducePlan.isEmpty()) {
                    if (pack instanceof POPackage) {
                        mro.reducePlan.remove(pack);
                    }
                    PigMapReduce.Reduce reduce = "true".equals(jobConf.get("pig.usercomparator")) ? new PigGenericMapReduce.ReduceWithComparator() : new PigMapReduce.Reduce();
                    Reducer.Context context = reduce.getIllustratorContext(job, intermediateData, (POPackage)pack);
                    if (mro.isCounterOperation()) {
                        reduce = new PigMapReduceCounter.PigReduceCounter();
                        context = ((PigMapReduceCounter.PigReduceCounter)reduce).getIllustratorContext(job, intermediateData, (POPackage)pack);
                    }
                    reduce.setReducePlan(mro.reducePlan);
                    reduce.run(context);
                }
                for (PhysicalOperator key : mro.phyToMRMap.keySet()) {
                    for (PhysicalOperator value : mro.phyToMRMap.get(key)) {
                        this.phyToMRMap.put(key, value);
                    }
                }
            }
            int removedMROp = jcc.updateMROpPlan(new LinkedList<Job>());
            numMRJobsCompl += removedMROp;
        }
        jcc.reset();
    }

    public Map<PhysicalOperator, PhysicalOperator> getPhyToMRMap() {
        return this.phyToMRMap;
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private class OutputAttacher
    extends PhyPlanVisitor {
        private Map<String, DataBag> outputBuffer;

        OutputAttacher(PhysicalPlan plan, Map<String, DataBag> output) {
            super(plan, (PlanWalker<PhysicalOperator, PhysicalPlan>)new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
            this.outputBuffer = output;
        }

        @Override
        public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
            if (userFunc.getFunc() != null && userFunc.getFunc() instanceof ReadScalars) {
                ((ReadScalars)userFunc.getFunc()).setOutputBuffer(this.outputBuffer);
            }
        }
    }
}

