MultiThreadedCollector.java

/**
 * MultiThreadedCollector.java This file is part of WattDepot.
 *
 * Copyright (C) 2013  Cam Moore
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
package org.wattdepot.client.http.api.collector;

import java.util.Timer;
import java.util.TimerTask;

import org.apache.commons.validator.UrlValidator;
import org.wattdepot.client.http.api.WattDepotClient;
import org.wattdepot.common.domainmodel.CollectorProcessDefinition;
import org.wattdepot.common.domainmodel.Depository;
import org.wattdepot.common.domainmodel.Sensor;
import org.wattdepot.common.domainmodel.SensorModel;
import org.wattdepot.common.exception.BadCredentialException;
import org.wattdepot.common.exception.BadSensorUriException;
import org.wattdepot.common.exception.IdNotFoundException;
import org.wattdepot.common.util.SensorModelHelper;
import org.wattdepot.common.util.Slug;
import org.wattdepot.common.util.tstamp.Tstamp;

/**
 * MultiThreadedCollector - Abstract base class for all Multi-Threaded
 * Collectors.
 * 
 * @author Cam Moore
 * 
 */
public abstract class MultiThreadedCollector extends TimerTask {

  /** Flag for debugging messages. */
  protected boolean debug;
  /** The definition about the collector. */
  protected CollectorProcessDefinition definition;

  /** The client used to communicate with the WattDepot server. */
  protected WattDepotClient client;
  /** The Depository for storing measurements. */
  protected Depository depository;

  /**
   * Initializes the MultiThreadedCollector.
   * 
   * @param serverUri The URI for the WattDepot server.
   * @param username The name of a user defined in the WattDepot server.
   * @param orgId the id of the organization the user is in.
   * @param password The password for the user.
   * @param collectorId The CollectorProcessDefinitionId used to initialize this
   *        collector.
   * @param debug flag for debugging messages.
   * @throws BadCredentialException if the user or password don't match the
   *         credentials in WattDepot.
   * @throws IdNotFoundException if the processId is not defined.
   * @throws BadSensorUriException if the Sensor's URI isn't valid.
   */
  public MultiThreadedCollector(String serverUri, String username, String orgId, String password,
      String collectorId, boolean debug) throws BadCredentialException, IdNotFoundException,
      BadSensorUriException {
    this.client = new WattDepotClient(serverUri, username, orgId, password);
    this.debug = debug;
    this.definition = client.getCollectorProcessDefinition(collectorId);
    this.depository = client.getDepository(definition.getDepositoryId());
    validate();
  }

  /**
   * @param serverUri The URI for the WattDepot server.
   * @param username The name of a user defined in the WattDepot server.
   * @param orgId the id of the user's organization.
   * @param password The password for the user.
   * @param sensorId The id of the Sensor to poll.
   * @param pollingInterval The polling interval in seconds.
   * @param depository The Depository to store the measurements.
   * @param debug flag for debugging messages.
   * @throws BadCredentialException if the user or password don't match the
   *         credentials in WattDepot.
   * @throws BadSensorUriException if the Sensor's URI isn't valid.
   * @throws IdNotFoundException if there is a problem with the sensorId.
   */
  public MultiThreadedCollector(String serverUri, String username, String orgId, String password,
      String sensorId, Long pollingInterval, Depository depository, boolean debug)
      throws BadCredentialException, BadSensorUriException, IdNotFoundException {
    this.client = new WattDepotClient(serverUri, username, orgId, password);
    this.debug = debug;
    this.definition = new CollectorProcessDefinition(Slug.slugify(sensorId + " " + pollingInterval
        + " " + depository.getName()), sensorId, pollingInterval, depository.getName(), null);
    client.putCollectorProcessDefinition(definition);
    this.depository = depository;
    client.putDepository(depository);
    validate();
  }

  /**
   * @return true if everything is good to go.
   */
  public boolean isValid() {
    if (this.client != null && this.definition != null) {
      return true;
    }
    return false;
  }

  /**
   * @param serverUri The URI for the WattDepot server.
   * @param username The name of a user defined in the WattDepot server.
   * @param orgId the user's organization id.
   * @param password The password for the user.
   * @param collectorId The CollectorProcessDefinitionId used to initialize this
   *        collector.
   * @param debug flag for debugging messages.
   * @return true if sensor starts successfully.
   * @throws InterruptedException If sleep is interrupted for some reason.
   * @throws BadCredentialException if the username and password are invalid.
   */
  public static boolean start(String serverUri, String username, String orgId, String password,
      String collectorId, boolean debug) throws InterruptedException, BadCredentialException {

    // Before starting any sensors, confirm that we can connect to the WattDepot
    // server. We do
    // this here because if the server and sensors are running on the same
    // system, at boot time the
    // sensor might start before the server, causing client calls to fail. If
    // this happens, we
    // want to catch it at the top level, where it will result in the sensor
    // process terminating.
    // If we wait to catch it at the per-sensor level, it might cause a sensor
    // to abort for what
    // might be a short-lived problem. The sensor process should be managed by
    // some other process
    // (such as launchd), so it is OK to terminate because it should get
    // restarted if the server
    // isn't up quite yet.
    WattDepotClient staticClient = new WattDepotClient(serverUri, username, orgId, password);
    if (!staticClient.isHealthy()) {
      System.err.format("Could not connect to server %s. Aborting.%n", serverUri);
      // Pause briefly to rate limit restarts if server doesn't come up for a
      // long time
      Thread.sleep(2000);
      return false;
    }
    // Get the collector process definition
    CollectorProcessDefinition definition = null;
    Sensor sensor = null;
    SensorModel model = null;

    try {
      definition = staticClient.getCollectorProcessDefinition(collectorId);
      staticClient.getDepository(definition.getDepositoryId());
      sensor = staticClient.getSensor(definition.getSensorId());
      model = staticClient.getSensorModel(sensor.getModelId());
      // Get SensorModel to determine what type of collector to start.
      if (model.getName().equals(SensorModelHelper.EGAUGE) && model.getVersion().equals("1.0")) {
        Timer t = new Timer();
        try {
          EGaugeCollector collector = new EGaugeCollector(serverUri, username, sensor.getOrganizationId(),
              password, collectorId, debug);
          if (collector.isValid()) {
            System.out.format("Started polling %s sensor at %s%n", sensor.getName(),
                Tstamp.makeTimestamp());
            t.schedule(collector, 0, definition.getPollingInterval() * 1000);
          }
          else {
            System.err.format("Cannot poll %s sensor%n", sensor.getName());
            return false;
          }
        }
        catch (BadSensorUriException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
        catch (IdNotFoundException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      }
      else if (model.getName().equals(SensorModelHelper.SHARK) && model.getVersion().equals("1.03")) {
        Timer t = new Timer();
        try {
          SharkCollector collector = new SharkCollector(serverUri, username, orgId, password,
              collectorId, debug);
          if (collector.isValid()) {
            System.out.format("Started polling %s sensor at %s%n", sensor.getName(),
                Tstamp.makeTimestamp());
            t.schedule(collector, 0, definition.getPollingInterval() * 1000);
          }
          else {
            System.err.format("Cannot poll %s sensor%n", sensor.getName());
            return false;
          }
        }
        catch (IdNotFoundException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
        catch (BadSensorUriException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      }
      else if (model.getName().equals(SensorModelHelper.STRESS)) {
        Timer t = new Timer();
        try {
          StressCollector collector = new StressCollector(serverUri, username, orgId, password, collectorId, debug);
          if (collector.isValid()) {
            System.out.format("Started polling %s sensor at %s%n", sensor.getName(),
                Tstamp.makeTimestamp());
            t.schedule(collector, 0, definition.getPollingInterval() * 1000);
          }
          else {
            System.err.format("Cannot poll %s sensor%n", sensor.getName());
            return false;
          }
        }
        catch (IdNotFoundException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
        catch (BadSensorUriException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      }
      else if (model.getName().equals(SensorModelHelper.WEATHER)) {
        Timer t = new Timer();
        try {
          NOAAWeatherCollector collector = new NOAAWeatherCollector(serverUri, username, orgId, password, collectorId, debug);
          if (collector.isValid()) {
            System.out.format("Started polling %s sensor at %s%n", sensor.getName(),
                Tstamp.makeTimestamp());
            t.schedule(collector, 0, definition.getPollingInterval() * 1000);
          }
          else {
            System.err.format("Cannot poll %s sensor%n", sensor.getName());
            return false;
          }
        }       
        catch (IdNotFoundException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
        catch (BadSensorUriException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      }
    }
    catch (IdNotFoundException e) {
      System.err.println(e.getMessage());
      return false;
    }
    return true;
  }

  /**
   * @throws BadSensorUriException if the Sensor's URI isn't valid.
   * @throws IdNotFoundException if there is a problem with the Collector
   *         Process Definition.
   */
  private void validate() throws BadSensorUriException, IdNotFoundException {
    Sensor s = client.getSensor(definition.getSensorId());
    String[] schemes = { "http", "https" };
    UrlValidator urlValidator = new UrlValidator(schemes);
    if (!urlValidator.isValid(s.getUri())) {
      throw new BadSensorUriException(s.getUri() + " is not a valid URI.");
    }
  }
}