MeasurementPruner.java

/**
 * MeasurementGarbageCollector.java This file is part of WattDepot.
 *
 * Copyright (C) 2014  Cam Moore
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
package org.wattdepot.server.measurement.pruning;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

import javax.xml.datatype.DatatypeConfigurationException;
import javax.xml.datatype.XMLGregorianCalendar;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.wattdepot.common.domainmodel.MeasurementPruningDefinition;
import org.wattdepot.common.domainmodel.Measurement;
import org.wattdepot.common.domainmodel.Organization;
import org.wattdepot.common.domainmodel.SensorGroup;
import org.wattdepot.common.exception.IdNotFoundException;
import org.wattdepot.common.util.DateConvert;
import org.wattdepot.common.util.tstamp.Tstamp;
import org.wattdepot.server.ServerProperties;
import org.wattdepot.server.WattDepotPersistence;

/**
 * MeasurementGarbageCollector - Removes measurements from the WattDepot
 * repository that are at a higher frequency sampling rate than desired.
 * 
 * @author Cam Moore
 * 
 */
public class MeasurementPruner extends TimerTask {

  /**
   * The window to get the measurements. Hopefully allows for quicker
   * performance.
   */
  public static final int PRUNE_WINDOW = 6 * 60;

  private WattDepotPersistence persistance;
  private MeasurementPruningDefinition definition;
  private boolean debug;

  /**
   * Create a MeasurementGarbageCollector.
   * 
   * @param properties The ServerProperties that define the type of persistence.
   * @param gcdId The id of the MeasurementPruningDefintion.
   * @param orgId The id of the Organization.
   * @param debug true if want debugging information.
   * @throws Exception If there is a problem instantiating the
   *         WattDepotPersistence.
   */
  public MeasurementPruner(ServerProperties properties, String gcdId, String orgId, boolean debug)
      throws Exception {
    // Get the WattDepotPersistence implementation.
    String depotClass = properties.get(ServerProperties.WATT_DEPOT_IMPL_KEY);
    this.persistance = (WattDepotPersistence) Class.forName(depotClass)
        .getConstructor(ServerProperties.class).newInstance(properties);
    this.definition = this.persistance.getMeasurementPruningDefinition(gcdId, orgId, true);
    this.debug = debug;
  }

  /**
   * @return the definition
   */
  public MeasurementPruningDefinition getDefinition() {
    return definition;
  }

  /**
   * @return The end of the collection window.
   */
  private Date getEndDate() {
    XMLGregorianCalendar now;
    Date ret = null;
    try {
      now = DateConvert.convertDate(new Date());
      XMLGregorianCalendar endCal = Tstamp
          .incrementDays(now, -1 * definition.getIgnoreWindowDays());
      ret = DateConvert.convertXMLCal(endCal);
    }
    catch (DatatypeConfigurationException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    return ret;
  }

  /**
   * @param sensorId the id of the sensor to get measurements for.
   * @return The Measurements in the collection window.
   * @throws IdNotFoundException if there is a problem with the
   *         MeasurementPruningDefintion.
   */
  private List<Measurement> getMeasurementsToCheck(String sensorId) throws IdNotFoundException {
    Date start = getStartDate();
    Date end = getEndDate();
    if (debug) {
      SimpleDateFormat df = new SimpleDateFormat();
      System.out.println("Collection window for " + sensorId + " is from " + df.format(start)
          + " to " + df.format(end));
    }
    return this.persistance.getMeasurements(this.definition.getDepositoryId(),
        this.definition.getOrgId(), sensorId, end, start, false);
  }

  /**
   * @return A list of measurements to garbage collect. They are at a higher
   *         sample rate than desired.
   * @throws IdNotFoundException if there is a problem with the
   *         MeasurementPruningDefinition.
   */
  public List<Measurement> getMeasurementsToDelete() throws IdNotFoundException {
    Long startTime = 0l;
    Long endTime = 0l;
    Long diff = 0l;
    if (debug) {
      startTime = System.nanoTime();
    }
    List<Measurement> ret = new ArrayList<Measurement>();
    SensorGroup group = this.persistance.getSensorGroup(this.definition.getSensorId(),
        this.definition.getOrgId(), false);
    if (group != null) {
      for (String s : group.getSensors()) {
        ret.addAll(getMeasurementsToDelete(s));
      }
    }
    else {
      ret = getMeasurementsToDelete(this.definition.getSensorId());
    }
    if (debug) {
      endTime = System.nanoTime();
      diff = endTime - startTime;
      System.out.println("getMeasurementsToDelete() took " + (diff / 1E9) + " secs.");
    }
    return ret;
  }

  /**
   * @param sensorId The id of the sensor making the measurements.
   * @return The List of measurements to delete for the given sensor.
   * @throws IdNotFoundException if there is a problem with the sensorId.
   */
  private List<Measurement> getMeasurementsToDelete(String sensorId) throws IdNotFoundException {
    List<Measurement> ret = new ArrayList<Measurement>();
    List<Measurement> check = getMeasurementsToCheck(sensorId);
    int size = check.size();
    int index = 1;
    int baseIndex = 0;
    while (index < size - 1) {
      long secondsBetween = Math.abs((check.get(index).getDate().getTime() - check.get(baseIndex)
          .getDate().getTime()) / 1000);
      if (secondsBetween < definition.getMinGapSeconds()) {
        ret.add(check.get(index++));
      }
      else {
        baseIndex = index;
        index++;
      }
    }
    return ret;

  }

  /**
   * @return the persistance
   */
  public WattDepotPersistence getPersistance() {
    return persistance;
  }

  /**
   * @return The start of the collection window.
   */
  private Date getStartDate() {
    XMLGregorianCalendar now;
    Date ret = null;
    try {
      now = DateConvert.convertDate(new Date());
      XMLGregorianCalendar startCal = Tstamp.incrementDays(now,
          -1 * definition.getIgnoreWindowDays());
      startCal = Tstamp.incrementDays(startCal, -1 * definition.getCollectWindowDays());
      ret = DateConvert.convertXMLCal(startCal);
    }
    catch (DatatypeConfigurationException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    return ret;
  }

  /**
   * Processes the command line arguments and runs the
   * MeasurementGarbageCollector one time.
   * 
   * @param args The command line arguments.
   * @throws Exception if there is a problem.
   */
  public static void main(String[] args) throws Exception {
    Options options = new Options();
    options
        .addOption("h", "help", false,
            "Usage: MeasurementPruner -o <orgId> -m <measurement pruning definition id>"
                + " [-d] [-s]");
    options.addOption("o", "orgId", true, "Organization Id.");
    options.addOption("m", "mpd", true, "MeasurementPruningDefinition Id.");
    options.addOption("d", "debug", false, "Display debugging information.");
    options.addOption("s", "single", false, "Run gc only once, right away.");

    CommandLine cmd = null;
    String orgId = null;
    String gcdId = null;
    boolean debug = false;
    boolean single = false;
    CommandLineParser parser = new PosixParser();
    HelpFormatter formatter = new HelpFormatter();
    try {
      cmd = parser.parse(options, args);
    }
    catch (ParseException e) {
      System.err.println("Command line parsing failed. Reason: " + e.getMessage() + ". Exiting.");
      System.exit(1);
    }
    if (cmd.hasOption("h")) {
      formatter.printHelp("MeasurementGarbageCollector", options);
      System.exit(0);
    }
    if (cmd.hasOption("o")) {
      orgId = cmd.getOptionValue("o");
    }
    else {
      orgId = Organization.ADMIN_GROUP.getId();
    }
    if (cmd.hasOption("m")) {
      gcdId = cmd.getOptionValue("m");
    }
    debug = cmd.hasOption("d");
    single = cmd.hasOption("s");
    if (debug) {
      System.out.println("Measurement Garbage Collection:");
      System.out.println("Org Id = " + orgId);
      System.out.println("GCD Id = " + gcdId);
      System.out.println("Single run = " + single);
    }
    ServerProperties properties = new ServerProperties();
    // if (debug) {
    // properties.set(ServerProperties.SERVER_TIMING_KEY,
    // ServerProperties.TRUE);
    // }
    MeasurementPruner mgc = new MeasurementPruner(properties, gcdId, orgId, debug);
    if (single) {
      mgc.pruneMeasurements();
    }
    else {
      // Set up the TimerTask to run the gc at the right time.
      if (debug) {
        System.out.println("Setting up Timer for " + mgc);
      }
      Timer t = new Timer();
      t.schedule(mgc, mgc.millisToNextRun(), mgc.getGCPeriod());
    }
  }

  /**
   * @return The number of milliseconds to wait till the next expected run time.
   */
  private long millisToNextRun() {
    if (debug) {
      System.out.print("milliseconds to next run is ");
    }
    if (definition.getNextRun() == null) {
      if (debug) {
        System.out.println("0");
      }
      return 0l;
    }
    else {
      long delay = definition.getNextRun().getTime() - new Date().getTime();
      if (debug) {
        System.out.println(delay);
      }
      if (delay < 0) {
        delay = 0l;
      }
      return delay;
    }
  }

  /**
   * @return The number of milliseconds to wait between GC runs.
   */
  private long getGCPeriod() {
    int period = 24 * 60 * 60 * 1000; // defaults to once a day
    if (definition.getCollectWindowDays() > 1) {
      period = (definition.getCollectWindowDays() - 1) * 24 * 60 * 60 * 1000;
      if (debug) {
        System.out.println("GC period is " + period + " milliseconds.");
      }
    }
    return period;
  }

  /*
   * (non-Javadoc)
   * 
   * @see java.util.TimerTask#run()
   */
  @Override
  public void run() {
    try {
      pruneMeasurements();
    }
    catch (DatatypeConfigurationException e) {
      e.printStackTrace();
    }
    catch (IdNotFoundException e) {
      e.printStackTrace();
    }
  }

  /**
   * Prunes the measurements.
   * 
   * @throws DatatypeConfigurationException if there is a problem with
   *         DateConvert.
   * @throws IdNotFoundException if there is a problem with persistence.
   */
  public void pruneMeasurements() throws DatatypeConfigurationException, IdNotFoundException {
    Date lastStarted = new Date();
    Integer deleted = 0;
    if (debug) {
      System.out.println("Starting run at " + new SimpleDateFormat().format(lastStarted));
    }
    // figure out the 6 hour windows to delete
    XMLGregorianCalendar start = DateConvert.convertDate(getStartDate());
    XMLGregorianCalendar end = DateConvert.convertDate(getEndDate());
    List<XMLGregorianCalendar> windows = Tstamp.getTimestampList(start, end, PRUNE_WINDOW);
    String sensorId = this.definition.getSensorId();
    SensorGroup group = this.persistance
        .getSensorGroup(sensorId, this.definition.getOrgId(), false);
    if (group != null) {
      for (String s : group.getSensors()) {
        for (int i = 0; i < windows.size() - 1; i++) {
          Date windowStart = DateConvert.convertXMLCal(windows.get(i));
          Date windowEnd = DateConvert.convertXMLCal(windows.get(i + 1));
          List<Measurement> check = this.persistance.getMeasurements(
              this.definition.getDepositoryId(), this.definition.getOrgId(), s, windowStart,
              windowEnd, false);
          int size = check.size();
          if (debug) {
            System.out.println("Sensor " + s + ": " + windowStart + " to " + windowEnd + " has "
                + size + " measurements");
          }
          int index = 1;
          int baseIndex = 0;
          while (index < size - 1) {
            long secondsBetween = Math.abs((check.get(index).getDate().getTime() - check
                .get(baseIndex).getDate().getTime()) / 1000);
            if (secondsBetween < definition.getMinGapSeconds()) {
              this.persistance.deleteMeasurement(this.definition.getDepositoryId(),
                  this.definition.getOrganizationId(), check.get(index++).getId());
              deleted++;
            }
            else {
              baseIndex = index;
              index++;
            }
          }
        }

      }
    }
    else {
      for (int i = 0; i < windows.size() - 1; i++) {
        Date windowStart = DateConvert.convertXMLCal(windows.get(i));
        Date windowEnd = DateConvert.convertXMLCal(windows.get(i + 1));
        List<Measurement> check = this.persistance.getMeasurements(
            this.definition.getDepositoryId(), this.definition.getOrgId(), sensorId, windowStart,
            windowEnd, false);
        int size = check.size();
        if (debug) {
          System.out.println("Sensor " + sensorId + ": " + windowStart + " to " + windowEnd
              + " has " + size + " measurements");
        }
        int index = 1;
        int baseIndex = 0;
        while (index < size - 1) {
          long secondsBetween = Math.abs((check.get(index).getDate().getTime() - check
              .get(baseIndex).getDate().getTime()) / 1000);
          if (secondsBetween < definition.getMinGapSeconds()) {
            this.persistance.deleteMeasurement(this.definition.getDepositoryId(),
                this.definition.getOrganizationId(), check.get(index++).getId());
            deleted++;
          }
          else {
            baseIndex = index;
            index++;
          }
        }
      }
    }
    Date lastCompleted = new Date();
    this.definition.setLastStarted(lastStarted);
    this.definition.setLastCompleted(lastCompleted);
    this.definition.setNumMeasurementsCollected(deleted);
    try {
      this.persistance.updateMeasurementPruningDefinition(this.definition);
    }
    catch (IdNotFoundException e) {
      e.printStackTrace();
    }
    if (debug) {
      System.out.println("Finished run at " + new SimpleDateFormat().format(lastCompleted));
      System.out.println("Deleted " + deleted + " measurements.");
    }
  }
}