您的位置:首页 > 编程语言 > Java开发

HDPCD-Java-复习笔记(16)

2017-10-17 14:40 507 查看
PIG

Pig uses a high-level, SQL-like programming language namedPig Latin.

Pig
was created at Yahoo
to make it easier to analyze the data in HDFS without the complexities of writing a traditional MapReduce program. The developers of Pig published their philosophy to summarize the goals of Pig, using comparisons
to actual pigs:

Pigs eat anything
Pig can process any data, structured or unstructured
Pigs live anywhere
Pig can run on any parallel data processing framework, so Pig scripts do not have to run just on Hadoop
Pigs are domestic animals
Pig is designed to be easily controlled and modified by its users
Pigs fly
Pig is designed to process data quickly
Pig Latin scripts can be executed one of three ways:

Pig script
Write a Pig Latin program in a text file and execute it using the pig executable
Grunt shell
Enter Pig statements manually one-at-a-time from a CLI tool known as the Grunt interactive shell
Embedded in Java
Use the PigServer class to execute a Pig query from within Java code
Pig executes in a unique fashion. Some commands build on previous commands, while certain commands trigger a MapReduce job:

•During execution,each statement is processed by the Pig interpreter

• If a statement is valid, it getsadded to a logical plan built by the interpreter

• The steps in the logical plan do not actually execute until aDUMP or STORE command

The Grunt Shell

Grunt is an interactive shell that enables users to enter Pig Latin statements and also interact with HDFS. 

To enter the Grunt shell, run the pig executable in the PIG_HOME\bin folder.

Pig Data Types

Pig has the following built-in scalar datatypes:

int
A 32-bit signed integer
long
A 64-bit signed integer
float
32-bit floating-point number
double
64-bit floating-point number
chararray
Strings of Unicode characters, represented as java.lang.String objects
bytearray
A blob or array of bytes
boolean
Can be either true or false (case-sensitive)
datetime
Stores a date and time in the format 1970-01-01T00:00:00.000+00:00
biginteger and bigdecimal
Map to Java’s BigInteger and BigDecimal classes, respectively, and are useful when performing
precision arithmetic
The FOREACH GENERATE Operator



The following example command takes in the salaries relation and generates a new relation that only contains two of the columns in salaries:

•> A = FOREACH salaries GENERATE age, salary;

• > DESCRIBE A;

• A: {age: int,salary: double}

More Pig Examples

The ORDER BY command enables sorting the data in a relation:

•employees = LOAD 'pig/input/File1' USING PigStorage(',')

•       AS (name:chararray,age:int, zip:int,salary:double);

• sorted = ORDER employees BY salary;

The LIMIT command limits the number of output tuplesfor a relation:

•employees = LOAD 'pig/input/File1' USING PigStorage(',');

•AS (name:chararray,age:int, zip:int,salary:double);

•agegroup = GROUP employees BY age;        

•h = LIMIT agegroup100;

The JOIN command performs a join of two relations:

e1 = LOAD 'pig/input/File1' USING PigStorage(',')
      AS (name:chararray, age:int, zip:int, salary:double);
e2 = LOAD 'pig/input/File2' USING PigStorage(',')
      AS (name:chararray, phone:chararray);
e3 = JOIN e1 BY name, e2 BY name;
DESCRIBE e3;
DUMP e3;
The output of the DESCRIBE statement looks like:

• e3: {e1::name:chararray, e1::age:int,

•     e1::zip:int,e1::salary:double,

•     e2::name:chararray,e2::phone:chararray}

The output of the DUMP will look like:

• (Joe,21,94085,50000.0,Joe,4085559898)

• (Joe,21,94085,50000.0,Joe,4085557777)

• (Tom,21,94085,5000.0,Tom,4085551211)

• (Tom,21,94085,5000.0,Tom,6505550123)

• (John,45,95014,25000.0,John,4085554332)

Pig User-Defined Functions

The Pig API has a large collection of built-in functions for performing common tasks and computations.

To write a UDF in Java, follow these steps:

·         Write a Java class that extends EvalFunc.
·         Deploy the class in a JAR file.
·         Register the JAR file in the Pig script using the REGISTER command.
·         Optionally define an alias for the UDF using the DEFINE command.
A UDF Example

The following UDF adds a comma between two input strings:

package com.hortonworks.udfs;
public class CONCAT_COMMA extends EvalFunc<String> {
  @Override
  public String exec(Tuple input) throws IOException {
    String first = input.get(0).toString().trim();
    String second = input.get(1).toString().trim();
     return first + ", " + second;
  }
}

The generic of EvalFunc represents the data type of the return value.
The exec method is called when the UDF is invoked fromthe Pig script.

The input parameter is a Tuple instance - which allowsfor an arbitrary number of arguments.Use the get method of Tuple to
retrieve the arguments passed in.

Invoking a UDF

Before a UDF can be invoked, the function needs to be registered by the Pig script.Use the REGISTER command to
register a JAR:

register my.jar;

Once the JAR is registered, call the UDFusing its fully-qualified class name:

x = FOREACH logevents
   GENERATE com.hortonworks.udfs.CONCAT_COMMA(level, code);

Use the DEFINE command to define an alias to simplify the syntax for invoking the UDF:
•DEFINE CONCAT_COMMA com.hortonworks.udfs.CONCAT_COMMA();

Now invoke the UDF using the alias:

x = FOREACH logevents GENERATE CONCAT_COMMA(level, code);

Another UDF Example

There aretwo types ofPig User-Defined Functions:

·         Evaluation functions
·         Filter functions
Most Pig functions are evaluation functions: arguments are passed in, a result is evaluated, and then returned.

Evaluation functions have a generic that specifies the return type of the function. Filter functions do not use a generic because they can only return a boolean.

Filter functions are used for filtering relations.

Filter Functions

A filter function is like an evaluation function, except itcan only return true or false. Writing a UDF filter function enables the FILTER BY command to use the function.

A custom filter function is written by:

·         Extending the FilterFunc class
·         Overriding the exec function and returning a boolean
To demonstrate, the following MinimumMoneyFlow class filters stocks whose daily
money flow is greater than a specified minimum.

public class MinimumMoneyFlow extends FilterFunc {
    private long minFlow;
    public MinimumMoneyFlow(String minFlow) {
        this.minFlow = Long.parseLong(minFlow);
    }
    @Override
    public Boolean exec(Tuple input) throws IOException {
        Object value1 = input.get(0);
        Object value2 = input.get(1);
        Object value3 = input.get(2);
        Object value4 = input.get(3);
        if(value1 == null || value2 == null ||
            value3 == null || value4 == null)
            return false;
         long volume = Long.parseLong(value1.toString());
         double high = Double.parseDouble(value2.toString());
         double low = Double.parseDouble(value3.toString());
         double close = Double.parseDouble(value4.toString());
         double currentFlow =
                  ((high + low + close) / 3) * volume;
          if(currentFlow >= minFlow) {
              return true;
          } else {
              return false;
         }
    }
}

The following Pig script invokes the MinimumMoneyFlow function, setting the minimum flow to$100,000,000:

register /root/java/workspace/Pig/stockudfs.jar;

DEFINE MONEYFLOW stockudfs.MinimumMoneyFlow('100000000');

stockdata = LOAD 'stocksA/NYSE_daily_prices_A.csv' using

 PigStorage(',') AS (exchange:chararray, symbol:chararray,date:chararray, open:float, high:float,
low:float,close:float, volume:int);

m = FILTER stockdataBY MONEYFLOW(volume,high, low, close);

Accumulator UDFs

When a UDF implements Accumulator, Pig does not read in all the data at once
but instead reads in only a subset of the records and passes them to the accumulate method of the UDF.

Writing an Accumulator UDF

To write an Accumulator function, extend EvalFunc and implement Accumulator.
This means both an exec method and the accumulate method must be written. If those two methods have the exact same behavior, then optionally extend AccumulatorEvalFunc and
only implement the accumulate function.

Accumulator containsthree methods:

·         accumulate
·         getValue
·         cleanup
Here is thesequence of events
when an Accumulator function is invoked
:

•The
accumulate method is passed a subset of records. The default value is 20,000 records, but you configure that number using the pig.accumulative.batchsize property.

•The
method keeps getting re-invoked until all records have been passed to accumulate.

•The framework then invokes the getValue method,
which is responsible for returning the result of the function.

•If the data passed in is a collection of bags
in a nested foreach , then the cleanup method is invokedafter getValue, and accumulate starts
getting called again with the next bag of records.

To demonstrate,The following is only a subset of the code;

•public class COUNT extends EvalFunc<Long>

•           implements Algebraic, Accumulator<Long> {

•    private long intermediateCount = 0L;

•    public void accumulate(Tuple b) throws IOException {

•        try {

•            DataBag bag = (DataBag)b.get(0);

•            Iterator it = bag.iterator();

•            while (it.hasNext()){

•                Tuple t = (Tuple)it.next();

•                if (t != null && t.size() > 0 &&

•          t.get(0) != null) {

•                    intermediateCount += 1;

•                }

•            }

•        } catch (ExecException ee) {

•            throw ee;

•        }

•    }

•    public void cleanup() {

•        intermediateCount = 0L;

•    }

•    public Long getValue() {

•        return intermediateCount;

•    }

• }

Understanding Accumulator Behavior

Keep in mind that implementing Accumulator does not guarantee that the Pig framework will send records in small batches. That is why it is important
to implement the exec method also, in case the framework invokes the UDF with all the records at once.

How the framework decides when to accumulate and when to send all records at once depends on the usage of the UDF:

1.If multiple UDFs are invoked in a FOREACH statement, and all of the UDFs implement Accumulator, then the records will be sent
in small batches.

2.If
at least one UDF in a FOREACH statement does not implement Accumulator, then all of the function calls in that FOREACH statement will be executed in a single batch of records. This is because the entire bag of records
has to be read into memory for at least one function, so there is no gain in accumulating records for the other UDFs in that statement.

Overview of Algrebraic Functions

A common UDF type in Pig is algebraic functions, which take in a group of tuples and return a scalar value. Examples include the built-in Pig functions like MAX,MIN, COUNT,
SUM, and so on.

What makesa function algebraic is its ability to process a bag of tuples over multiple phases, instead of
in a single phase where the entire bag is passed to the function at once.

To write an algebraic function, implement the Algebraic interface, whichcontains
three methods
:

getInitial
Returns a String that represents the class name of the Initial function.
getIntermed
Returns a String that represents the class name of the Intermediate function.
getfinal
Returns a String that represents the class name of the Final function.
However, in a situation where an algebraic function makes sense, developers can realize the benefit from the MapReduce framework
being able to use a Combiner to minimize network traffic and efficiently process a large number of tuples.

Algebraic functions are similar to Accumulator functions:
they implement Algebraic AND extend EvalFunc. If an algebraic function is invoked in a foreach statement along with a non-algebraic function, then the algebraic function will be invoked normally andwill ignore its algebraic implementation.

Example of an Algebraic Function

public class COUNT extends EvalFunc<Long>
implements Algebraic, Accumulator<Long> {
   
  public Long exec(Tuple input) throws IOException {
    return count(input);
  }
 
  public String getInitial() {
    return Initial.class.getName();
  }
  public String getIntermed() {
    return Intermed.class.getName();
  }
  public String getFinal() {
    return Final.class.getName();
  }
 
  static public class Initial extends EvalFunc<Tuple> {
      public Tuple exec(Tuple input) throws IOException {
        return
         TupleFactory.getInstance().newTuple(count(input));
      }
  }
  static public class Intermed extends EvalFunc<Tuple> {
        public Tuple exec(Tuple input) throws IOException {
      return
         TupleFactory.getInstance().newTuple(sum(input));
     }
  }
  static public class Final extends EvalFunc<Long> {
     public Long exec(Tuple input) throws IOException {
       return sum(input);
     }
  }
  static protected Long count(Tuple input) {
        Object values = input.get(0);
        if (values instanceof DataBag)
            return ((DataBag)values).size();
        else if (values instanceof Map)
           return new Long(((Map)values).size());
  }
  static protected Long sum(Tuple input) {
        DataBag values = (DataBag)input.get(0);
        long sum = 0;
        for (Iterator<Tuple> it = values.iterator();
         it.hasNext();) {
            Tuple t = it.next();
            sum += (Long)t.get(0);
        }
        return sum;
  }
}

This is only a subset of the actual COUNT UDF. View the entire source code here:
https://svn.apache.org/repos/asf/pig/trunk/src/org/apache/pig/builtin/COUNT.java
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hdp