# Pentaho MapReduce

This job entry executes transformations as part of a Hadoop MapReduce job, instead of requiring a traditional Hadoop Java class.

A Hadoop MapReduce job can include any combination of the following transformation types:

* **Mapper transformation** (required): Converts input data into key/value tuples. It can filter and sort data, and applies a function to each element of a list.
* **Combiner transformation** (optional): Summarizes map output records that share the same key. This can reduce the amount of data written to disk and transmitted over the network.
* **Reducer transformation** (optional): Performs summary operations across keys (for example, counting occurrences) and outputs results.

{% hint style="info" %}
This entry was formerly known as **Hadoop Transformation Job Executor**.
{% endhint %}

{% hint style="warning" %}
The **Hadoop job name** field on the **Cluster** tab is required.
{% endhint %}

### General

* **Entry name**: Specify the unique name of the job entry on the canvas. The default is **Pentaho MapReduce**.

### Options

The Pentaho MapReduce job entry includes several tabs to define transformations and configure the Hadoop cluster connection.

#### Mapper tab

![Pentaho MapReduce Mapper tab](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-7fcafb2e7902f94e576b4e118ba6bb2bf54f825c%2FPDI%20PMR%20job%20Mapper%20tab.png?alt=media)

| Option               | Definition                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
| -------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Transformation**   | <p>Specify the transformation that performs the mapper functions by entering its path or clicking <strong>Browse</strong>.</p><p>If you select a transformation that shares the same root path as the current transformation, PDI inserts <code>${Internal.Entry.Current.Directory}</code> in place of the common root path.</p><p>If you are working with a repository, specify the transformation name. If you are not working with a repository, specify the transformation XML file name.</p><p><strong>Note:</strong> Transformations previously specified by reference are automatically converted to use the transformation name within the Pentaho Repository.</p> |
| **Input step name**  | The name of the step that receives mapping data from Hadoop. It must be a **MapReduce Input** step.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
| **Output step name** | The name of the step that passes mapping output back to Hadoop. It must be a **MapReduce Output** step.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |

#### Combiner tab

![Combiner tab, Pentaho MapReduce](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-220aab8250d65088f8d9ccab99bb7bcc68213560%2FssPDIMapReduceEntry-CombinerTab.png?alt=media)

| Option                                        | Definition                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| --------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Transformation**                            | <p>Specify the transformation that performs the combiner functions by entering its path or clicking <strong>Browse</strong>.</p><p>You can use internal variables such as <code>${Internal.Entry.Current.Directory}</code> in the path.</p><p>If you are working with a repository, specify the transformation name. If you are not working with a repository, specify the transformation XML file name.</p><p><strong>Note:</strong> Transformations previously specified by reference are automatically converted to use the transformation name within the Pentaho Repository.</p> |
| **Input step name**                           | The name of the step that receives combiner data from Hadoop. It must be a **MapReduce Input** step.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| **Output step name**                          | The name of the step that passes combiner output back to Hadoop. It must be a **MapReduce Output** step.                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| **Use single threaded transformation engine** | Use the single-threaded transformation execution engine to run the combiner transformation. This can reduce overhead when processing many small groups of output.                                                                                                                                                                                                                                                                                                                                                                                                                     |

#### Reducer tab

![Reducer tab, Pentaho MapReduce](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-238461f0996a2f1fa41ba2e970a9431299bf61e0%2FssPDIMapReduceEntry-ReducerTab.png?alt=media)

| Option                                        | Definition                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
| --------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| **Transformation**                            | <p>Specify the transformation that performs the reducer functions by entering its path or clicking <strong>Browse</strong>.</p><p>You can use internal variables such as <code>${Internal.Entry.Current.Directory}</code> in the path.</p><p>If you are working with a repository, specify the transformation name. If you are not working with a repository, specify the transformation XML file name.</p><p><strong>Note:</strong> Transformations previously specified by reference are automatically converted to use the transformation name within the Pentaho Repository.</p> |
| **Input step name**                           | The name of the step that receives reducer data from Hadoop. It must be a **MapReduce Input** step.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| **Output step name**                          | The name of the step that passes reducer output back to Hadoop. It must be a **MapReduce Output** step.                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| **Use single threaded transformation engine** | Use the single-threaded transformation execution engine to run the reducer transformation. This can reduce overhead when processing many small groups of output.                                                                                                                                                                                                                                                                                                                                                                                                                     |

#### Job Setup tab

![Pentaho MapReduce Job setup tab](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-1c270712b67fcbf1805e9418b9dc5be67b3fca05%2FPDI%20PMR%20job%20Job%20setup%20tab.png?alt=media)

| Option                            | Definition                                                                                                                                                                                                                                                                                                                                                  |
| --------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Input path**                    | The input directory path on the Hadoop cluster that contains the source data (for example, `/wordcount/input`). You can provide multiple input directories as a comma-separated list. If you want to read from S3, use the S3A connector (`s3a://`). Connectors `s3` and `s3n` are not supported. See the Hadoop documentation about Amazon S3 for details. |
| **Output path**                   | The output directory path on the Hadoop cluster (for example, `/wordcount/output`). The output directory cannot exist before you run the MapReduce job. To write to S3, use the S3A connector (`s3a://`).                                                                                                                                                   |
| **Remove output path before job** | Remove the output path before scheduling the MapReduce job. **Note:** Do not use this option with S3. To clean an S3 output path, use an alternative entry, such as [Delete folders](https://pentaho-public.atlassian.net/wiki/spaces/EAI/pages/372703488/Delete+folders).                                                                                  |
| **Input format**                  | The Apache Hadoop class name that describes the input specification. For more information, see [InputFormat](http://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/mapreduce/lib/input/FileInputFormat.html).                                                                                                                               |
| **Output format**                 | The Apache Hadoop class name that describes the output specification. For more information, see [OutputFormat](http://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.html).                                                                                                                           |
| **Ignore output of map key**      | Ignore the key output from the mapper transformation and replace it with `NullWritable`.                                                                                                                                                                                                                                                                    |
| **Ignore output of map value**    | Ignore the value output from the mapper transformation and replace it with `NullWritable`.                                                                                                                                                                                                                                                                  |
| **Ignore output of reduce key**   | Ignore the key output from combiner and/or reducer transformations and replace it with `NullWritable`. This requires a reducer transformation (not the Identity Reducer).                                                                                                                                                                                   |
| **Ignore output of reduce value** | Ignore the value output from combiner and/or reducer transformations and replace it with `NullWritable`. This requires a reducer transformation (not the Identity Reducer).                                                                                                                                                                                 |

#### Cluster tab

![Cluster tab, Pentaho MapReduce](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-c4bc43cb830e5447b741642dff73dbe5528c747d%2FssPDIMapReduceEntry-ClusterTab.png?alt=media)

| Option                      | Definition                                                                                                                                                                                                                                                                                                                                                                                                               |
| --------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| **Hadoop job name**         | The Hadoop job name. This field is required.                                                                                                                                                                                                                                                                                                                                                                             |
| **Hadoop Cluster**          | <p>Select an existing Hadoop cluster configuration or create a new one.</p><ul><li>Click <strong>Edit</strong> to modify an existing configuration.</li><li>Click <strong>New</strong> to create a new configuration.</li></ul><p>For more information, see <a href="../extracting-data-into-pdi/connecting-to-a-hadoop-cluster-with-the-pdi-client-article">Connecting to a Hadoop cluster with the PDI client</a>.</p> |
| **Number of Mapper Tasks**  | The number of mapper tasks to assign to the job. Input size typically determines this value.                                                                                                                                                                                                                                                                                                                             |
| **Number of Reducer Tasks** | The number of reducer tasks to assign to the job. **Note:** If this is `0`, no reduce operation is performed, the mapper output becomes the job output, and combiner operations are not performed.                                                                                                                                                                                                                       |
| **Logging Interval**        | The number of seconds between log messages.                                                                                                                                                                                                                                                                                                                                                                              |
| **Enable Blocking**         | Force the job to wait until the Hadoop job completes before continuing. This is the only way for PDI to determine job status. If you clear this option, PDI continues immediately and error handling/routing does not work.                                                                                                                                                                                              |

<details>

<summary>Hadoop cluster configuration fields (Edit/New)</summary>

When you click **Edit** or **New** next to **Hadoop Cluster**, the Hadoop cluster dialog box appears.

| Option                    | Definition                                                                                                                           |
| ------------------------- | ------------------------------------------------------------------------------------------------------------------------------------ |
| **Cluster Name**          | The name of the cluster configuration.                                                                                               |
| **Hostname** (HDFS)       | The hostname for the HDFS node.                                                                                                      |
| **Port** (HDFS)           | The port for the HDFS node.                                                                                                          |
| **Username** (HDFS)       | The username for the HDFS node.                                                                                                      |
| **Password** (HDFS)       | The password for the HDFS node.                                                                                                      |
| **Hostname** (JobTracker) | The hostname for the JobTracker node. If you have a separate JobTracker node, enter that hostname; otherwise, use the HDFS hostname. |
| **Port** (JobTracker)     | The port for the JobTracker. This port cannot be the same as the HDFS port.                                                          |
| **Hostname** (ZooKeeper)  | The hostname for the ZooKeeper node.                                                                                                 |
| **Port** (ZooKeeper)      | The port for the ZooKeeper node.                                                                                                     |
| **URL** (Oozie)           | A valid Oozie URL.                                                                                                                   |

After you set these options:

1. Click **Test** to verify the configuration.
2. Click **OK** to return to the **Cluster** tab.

</details>

#### User Defined tab

![User Defined tab, Pentaho MapReduce](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-0f8006d7db0bb5070223677a81f809b75bdd0513%2FssPDIMapReduceEntry-UserDefinedTab.png?alt=media)

Use this tab to define user-defined parameters and variables.

| Column    | Definition                                                                                                                                                                                                                                                                                                                                                                                                                          |
| --------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Name**  | The name of the parameter or variable to set. To set a Java system property, prefix the name with `java.system` (for example, `java.system.SAMPLE_VARIABLE`). Variables set here override variables set in `kettle.properties`. For more information, see [Kettle Variables](https://docs.pentaho.com/pdia-data-integration/archived-merged-pages/transforming-data-with-pdi-archive/pdi-run-modifiers/variables/kettle-variables). |
| **Value** | The value to assign to the parameter or variable.                                                                                                                                                                                                                                                                                                                                                                                   |

### Workflows and related information

#### Use PDI outside and inside the Hadoop cluster

PDI can run both outside a Hadoop cluster and within cluster nodes.

* Outside the cluster, PDI can extract data from or load data into Hadoop HDFS, Hive, and HBase.
* Inside the cluster, PDI transformations can act as mapper and/or reducer tasks. This enables you to build MapReduce jobs visually.

#### Pentaho MapReduce workflow

PDI and Pentaho MapReduce enable you to pull data from a Hadoop cluster, transform it, and pass it back to the cluster.

**Build the mapper transformation**

Start by designing the transformation you want.

* Create a PDI transformation.
* Add the Hadoop **MapReduce Input** and **MapReduce Output** steps.
* Configure both steps and connect your steps with hops.
* Name this transformation `Mapper`.

![Big Data Key Value Pair Example](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-6a3ecb8f9b0472fb91c257827b32297d7ca69cad%2FBigDataKeyValuePairExample.png?alt=media)

Hadoop communicates in key/value pairs. The **MapReduce Input** step defines how key/value pairs from Hadoop are interpreted by PDI.

![MapReduce Input Step dialog](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-7cfd55ee190f2236a21c0ed5ca87b765e64312e2%2FssPDIMapReduceInputStep.png?alt=media)

The **MapReduce Output** step passes output back to Hadoop.

![MapReduce Output Step dialog](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-71d6fbd1edd7366758cd6eefe579243737c2255b%2FssPDIMapReduceOutputStep.png?alt=media)

**Build the job that runs MapReduce**

* Create a PDI job.
* Add the **Pentaho MapReduce** job entry.
* Configure the **Mapper** tab to reference your mapper transformation.
* Add supporting entries such as **Start** and success/failure handling entries.

![Transformation Job Workflow Word Count Example](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-0acc47785235b1e402e3559b431a52728ba14a2d%2FJob_Workflow_word_count_example.png?alt=media)

![Pentaho MapReduce dialog Mapper tab](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-887280b1d66dded97ed9f216544738ae4a88ae85%2FssPDIMapReduceEntry-MapperTab.png?alt=media)

#### Run a Hadoop job by using a Java class

PDI can also execute a Java class from a job. Use the **Hadoop Job Executor** job entry to configure and run a `.jar` file.

![Hadoop Job Executor Workflow](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-7a928837cdb2a48c03d9eece0bf68e5e4b7fed96%2FHadoopJobExecutorWordCount.png?alt=media)

![Hadoop Job Executor dialog](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-95caa0f6f603edbbabe96a361fd5b3e67a4956e8%2Fhadoopjobexecutor_wordcount.png?alt=media)

If you use Amazon Elastic MapReduce (EMR), you can use the **Amazon EMR Job Executor** job entry, which includes connection information for Amazon S3 and configuration options for EMR.

![Amazon EMR Job Executor job entry](https://773338310-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FYwnJ6Fexn4LZwKRHghPK%2Fuploads%2Fgit-blob-c1ff285f722d4e13bc46deecff07319e36933450%2FAmazonEMREntry.png?alt=media)

#### Hadoop to PDI data type conversion

The **Hadoop Job Executor** and **Pentaho MapReduce** entries include an advanced configuration mode where you must specify Hadoop input/output data types.

| PDI (Kettle) Data Type              | Apache Hadoop Data Type             |
| ----------------------------------- | ----------------------------------- |
| `java.lang.Integer`                 | `org.apache.hadoop.io.IntWritable`  |
| `java.lang.Long`                    | `org.apache.hadoop.io.IntWritable`  |
| `java.lang.Long`                    | `org.apache.hadoop.io.LongWritable` |
| `org.apache.hadoop.io.IntWritable`  | `java.lang.Long`                    |
| `java.lang.String`                  | `org.apache.hadoop.io.Text`         |
| `java.lang.String`                  | `org.apache.hadoop.io.IntWritable`  |
| `org.apache.hadoop.io.LongWritable` | `org.apache.hadoop.io.Text`         |
| `org.apache.hadoop.io.LongWritable` | `java.lang.Long`                    |

#### Hadoop Hive-specific SQL limitations

Hive has limitations that can affect SQL queries, including:

* Outer joins are not supported.
* Each column can be used only once in a `SELECT` clause.
* Conditional joins can use only the `=` condition unless you use a `WHERE` clause.
* `INSERT` statements have specific syntax and limitations.

For details, see:

* <https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML>
* <https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions>

#### Big data tutorials

Pentaho big data tutorials provide scenario-based examples that demonstrate integration between Pentaho and Hadoop by using a sample data set.

Videos:

* Loading data into Hadoop from outside the Hadoop cluster: <https://www.youtube.com/watch?v=Ylekzmd6TAc>
* Pentaho MapReduce overview (interactive design without scripts/code): <https://www.youtube.com/watch?v=KZe1UugxXcs>
