Wednesday, December 30, 2015

SQL Hints - Map Joins for better performance

Thinking over this one a bit, it may make sense to build a little bit of background before jumping to map join directly.

Imagine if you have to write a program (assume we are in 1990 when no big data existed) to take 2 FILES and ready them and join and then write to another file. There may be better ways, but the most common approach that comes to mind is

  • Sort file A
  • Sort file B
  • Read A and B simultaneously and compare the keys and stall on the key which is lagging(both the files may not have same record count, like customer and order) and keep writing to an output file as keys match.

Key point is unless the files are sorted the join is not possible. This also extends to big data world, unless the files are sorted the join simply can not happen.

Remember: Hive underneath is map reduce.

Lets think of the above scenario in map-reduce world. A schematic representation would look like below


If you notice above Mappers as dealing with only individual files and sort and shuffle phase is where they come together, they get sorted and keys matched.  This is where it gets expensive.

A map join is basically a hint to do things little differently which is situational. i.e. one file is small and can be saved to distributed cache so that the join can happen in map phase. Confusing?... the following digram shows how it would look like


The distributed cache is a hash map that supports faster look up. The mapper 1 and 2 used the hash map in distributed cache to look up the values and join associated attributes to their own dataset before going through the sort and shuffle. This reduced the load on the sort and shuffle.

Thats a 2000ft view of map joins.

Important Note:

Recent versions of hive know to do automatic joins when the table statistics are known to the optimizer. However, if the statistics are not available for tables the optimizer won't know which table can be used as hash map. Also for subquery/intermediate datasets, the optimizer won't know during optimization phase how to map join (as the intermediate data sets are created in run time). This is where a humanly touch of giving map join hints possibly would save a lot of time.

Following is an example of map join

SELECT /*+ MAPJOIN(STORETABLE) */ COUNT(*) FROM
SALESTABLE JOIN STORETABLE ON (SALESTOREKEY = STOREKEY)

Map join is a hint keyword.... similar to hints in RDBMS world. If you are interested in understanding the concepts more in detail, it may be worthwhile to write a map reduce yourself that used distributed cache with a hash map.

Sunday, December 27, 2015

Hive Client Tools : Tools for query

Hive evolving as the simplest way to access the structured data from hadoop ecosystem, analysts often need a simpler interface to run queries and view results. Thankfully there are many FREE tools available to do so. Following are a list of tools that are free but not the entire list. I am not rating these tools either as there is always a possibility of a better one available somewhere that I am not aware of.

Hive Client Library: [Command Line]
This is a non-ui tool i.e. a command line interface and provides all the features that you need in hive environment. Just do a ssh to your cluster and hive command "hive" to start the hive shell.

Beeline: [Command Line]
This is based on SQLLine lib. Given the hive driver is class path and a JDBC URL, it can connect to hiveserver2 and run queries.

beeline> !connect jdbc:hive2://<host>:<port>/<db>;auth=noSasl hiveuser pass org.apache.hive.jdbc.HiveDriver

Hue: [Web Interface]
Hue has gotten really interesting over time. To get a feel of hue, head to the hue demo site.
Hortonworks sandbox comes preinstalled with Hue (though a good old version which can be updated) and runs on port 8000. like http://sandbox.hortoworks.com:8000/

Oracle SQL Developer: [Java Application]
Note: This only works with cloudier driver and with a  specific version. It does not appear that there is long term support to make it work with newer versions. If interested you can refer to this blog about how to setup the environment.

IBM Data Studio: [Java Application]
Built on top of Eclipse, I liked this tool for a short period that I worked with it. If interested to use, please head here

Squirrel: [Java Application]
This is one SQL Editor which supports almost all databases in market that supports JDBC.  Getting to work with Hive needs some amount of effort, buts its worth it considering Squirrel is written in Java and runs on Windows/Mac/Linux. For installation and configuration, I have posted some details here

Aginity Hadoop Workbench: [Windows Application]
This is windows native built application and my favorite. It does not provide connectivity to PIG, Oozie, etc like hue does. But for writing and running queries, it really excels. Aginity workbench also comes with a File shell that can help in managing files on HDFS. Its free, though registration is required. http://www.aginity.com/workbench/hadoop/



Using Squirrel to query Hive


To use squirrel download the following jar files (I am using hive 1.2.1). The first Jar file is the actual driver and others are the dependencies the driver has



Assuming you already have squirrel installed, (if not go to http://squirrel-sql.sourceforge.net/#installation ), Go to driver, click on the + sign to register a new driver.

























Add the downloaded Jar files here.
Click on hive-jdbc jar files and click "list drivers"
Give the example URL as jdbc:hive2://localhost:10000/default

And click Ok.

Now create an alias, and provide the server details and username and connect


















You are good to use Squirrel. I like the tool except for the fact that you can not have the object browser and SQL window side by side. (Or there may be a way, I am not aware of)

Improving Performance for Hive Queries

If hive is used as the interface for accessing TB and PB scale data, it is quite important to optimize the queries to get them to run faster. Optimizing the queries is directly related to infrastructure, size of data, organization of data, storage formats and the data readers/ processors. This opens up thousands of possibilities in terms of how the queries can be optimized.

Following are some suggestions that can increase the query performance without worrying too much about the data organization.


  • Set hive execution engine to spark
  • Set hive execution engine to Tez
  • Set table storage format to ORC
  • Use cost based query optimization
  • Use Vectorization for queries
  • Use Partition on data when possible
  • Force Situational Map Join. (More on this link)
Apart from these options there are tens of other ways to improve the performance that are scenario driven like map joins, bucketing and bucket sort the data, change the VM and YARN memory configurations, etc

Following are the details for some of the above options.


EXECUTION ENGINE: SPARK

Use spark as the query engine for Hive:  Currently the hadoop releases do not support a direct approach to setting the query engine to Spark (some level of effort is required in setting it up). Refer to this article if you are interested. Remember this is somewhat new and there are chances that some expected stuff may not work. So do a POC based on your needs before using in production.

EXECUTION ENGINE: TEZ
If setting up Spark is a difficult route, use tez as the query engine for hive.  Setting the query engine to Tez is as easy as giving the command
set hive.execution.engine=tez

Or you can make a permanent change by changing your hive-site.xml, usually located in [/usr/hdp/current/hive-server2/conf/] or by changing the "hive.execution.engine" property to "Tez" through  Ambari 

STORAGE FORMAT: ORC
ORC (Optimized Row Columnar) is an evolution to RC (Row Columnar) format. Data is stored in stripes and efficient for reading. You can lean more about ORC here.

A summary fo ORC: A collection of rows are stored in a single block of data, and inside the block the columns are stored together. i.e Depending on the row size, a block may have 20,000 rows in a block and within the block, column 1 data for all 20000 rows are stored together, column 2 data for all 20000 rows are stored together and so on... The advantage of this format is based on the columns selected in a query, the data is read only for the required columns in a block than the entire row data. In my experience, ORC format storage does not pose any challenges as against storage as text file in terms of SQL features. ORC just improves the performance.

About setting the storage to ORC, it can done in 3 ways.

>>While creating the table
create table myorctable (
  name string,
  age int

) stored as orc tblproperties ("orc.compress"="NONE");

>>By altering the table
ALTER TABLE myorctable SET FILEFORMAT ORC

>>By setting the default file format to ORC and then creating the tables.
SET hive.default.fileformat=Orc;
Setting the default file format can be done from the client application or can be applied in hive config file. When creating a table stored as ORC there are table properties that get applied like compression method. If you are planning to set the default storage as ORC, you may want to take a look at this link for setting other default table properties and their config names for setting the values in the hive config file.

AUTOMATIC QUERY PLAN OPTIMIZATION
A feature which is borrowed from RDBMS systems or MPP (Massively parallel processing) systems. This is predominantly helpful when the queries have joins or multiple stepped operations. Also commonly referred as COST BASED QUERY OPTIMIZATION

Using this feature has 2 parts.

  • Letting hive know that this feature should be used.
  • Making the table statistics available to hive.
>>Letting hive know that this feature should be used

Set the following parameters through the client or in hive configurations through Ambari.

set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;

set hive.stats.fetch.partition.stats=true;

>> Making the table statistics available to hive
For the tables involved in the queries that you wish to use the CBQO feature, run the following SQL

analyze table myorctable compute statistics for columns name, age;
--OR if using version 0.14 and beyond

analyze table myorctable compute statistics for columns;

Do this for all the tables involved in the query.


USE VECTORIZATION
This option helps only in certain scenarios. Vectorization basically stalls the rows and performs operation on then in batches of 1024 rows instead of processing 1 row at a time.

Vectorization can be enabled through the following settings

set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;

You can either give the above set commands through your client tool or enable them permanently in hive settings through Ambari.


USE DATA PARTITIONS

Hive partitions basically store the data in different folders based on the partition rules defined and accesses only required data when query filter are used. The most common partition used is date.

Unlike relational database, the partition columns being common to a dataset are not part of the columns in the data and as a result also helps in reducing the data volume.

2 important tips:

  • Do not over partition the data just because it reduces data volume.
  • Keep your commonly used/ most frequently used queries in mind while partitioning the data

Following is a sample DDL of how partitions are used.

CREATE TABLE MY_WEBLOG (
    SESSION_ID BIGINT,
    URL STRING,
    LOG_TIMESTAMP STRING
)

PARTITIONED BY (LOG_DATE STRING, LOG_HOUR STRING) ;

To insert into a partitioned tables, the insert statement goes like


INSERT INTO MY_WEBLOG
PARTITION (LOG_DATE = '2016-01-01', LOG_HOUR = '23')
SELECT  SESSION_ID, URL, LOG_TIMESTAMP
FROM    LOG_SOURCE

WHERE   LOG_DATE = '2016-01-01', LOG_HOUR = '23' ;

This is quirky to specify partition values every time there is an insert. Hive provides dynamic partition options in order for inserts to automatically partition the data.

INSERT INTO MY_WEBLOG
PARTITION (LOG_DATE, LOG_HOUR)
SELECT  SESSION_ID, URL, LOG_TIMESTAMP , LOG_DATE, LOG_HOUR
FROM    LOG_SOURCE;


Note: Its required to have the partition columns at the end of the insert columns. Remember, partition columns are not physical columns, they are grouping folders.



Friday, December 25, 2015

Hive External Tables with Multiple Character Delimiter

Hive external tables allow creating external reference tables without moving the data to a new location. I created a sample dataset with delimiter "|^|". However I soon realized that hive external tables only allow a single character as delimiter. i.e the following code does NOT work.


CREATE EXTERNAL TABLE TwitterFlatExternal
(
Column_1 STRING,
Column_2 STRING,
Column_3 STRING,
Column_4 STRING,
Column_5 STRING,
Column_6 STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|^|'
LINES TERMINATED BY '\n'

LOCATION '/user/dsarangi/twitterdataflat';


However it is possible to achieve the same results using RegexSerde. SerDe stands for serializer and deserializer. Read more about it on Hive Serde

The regular expression in regex SerDe basically would have to grab each column on a regex grouping. The following one is an example for 6 column table with delimiter = "|^|"

CREATE EXTERNAL TABLE TwitterFlatExternalReg
(
Column_1 STRING,
Column_2 STRING,
Column_3 STRING,
Column_4 STRING,
Column_5 STRING,
Column_6 STRING
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
"input.regex" = "^(.*)\\|\\^\\|(.*)\\|\\^\\|(.*)\\|\\^\\|(.*)\\|\\^\\|(.*)\\|\\^\\|(.*)$",
"output.format.string" = "%1$s %2$s %3$s %4$s %5$s %6$s"
)
STORED AS TEXTFILE
LOCATION '/user/dsarangi/twitterdataflat';

Using regex is cool but has its coolest pain areas also. Its a non-definitive approach and the regex has to be really solid to make sure every row renders the required number of columns. You have to create the table with the regex and then do a select on the table to know that the regex is actually working to render the columns. I use regex101 site to test my regex

Most important: When using the regex in hive use \\ (double back slash) for escaping. Single does not work. 

So when testing the regex on a online site, convert the escape character from single backslash to double backslash before using in hive query.



Thursday, December 24, 2015

Hello Twitter: Use Java to get sample data from Twitter

One of the often thought that strikes me when I try to do a POC for myself is where do I get some web data for my tests. I was lazy for a while to download few hundred rows from here and there but then decided to end the quest. Following are some steps which would help you to get sample twitter data if you are looking for some.

Twitter provides APIs to download the data from a data stream what twitter refers as public stream. There are other means like firehose that we are not going to look at.

Getting data from twitter is a 2 step process

  • Get authorization tokens from Twitter to access twitter data
  • Use some programming / scripting language to authorize and read streaming data from twitter
Get Authorization Tokens from Twitter
We need 4 things
  • Consumer Key
  • Consumer Secret
  • Access Token
  • Access Token Secret
Head to https://apps.twitter.com
Create an account if you don't have a twitter account. Else log in
Create an application, provide the details and give a dummy site if asked.
Head to "Keys and access tokens" tab. You would see the consumer key and consumer secret. You may have to click a button to see the Access Token and Access Token Secret.

Write a java program to use the authorization tokens and get the data

POM Dependancies required

<dependency>
    <groupId>org.scribe</groupId>
    <artifactId>scribe</artifactId>
    <version>1.3.7</version>
</dependency>
<dependency>
    <groupId>org.json</groupId>
    <artifactId>json</artifactId>
    <version>20151123</version>
</dependency>

TwitterStreamReader.java

package twitter.data.stream;

import java.io.*;
import java.text.SimpleDateFormat;
import java.util.Calendar;

import org.scribe.builder.*;
import org.scribe.builder.api.*;
import org.scribe.model.*;
import org.scribe.oauth.*;

import org.json.*;

public class TwitterStreamReader {

    private static String STREAM_URL = "https://stream.twitter.com/1.1/statuses/filter.json";
    private static String CONSUMER_KEY="<your consumer key>";
    private static String CONSUMER_SECRET="<your consumer secret";
    private static String ACCESS_TOKEN="your access token";
    private static String ACCESS_TOKEN_SECRET="your access token secret";
    private static String TRACK_KEYWORD= "NEWS USA";
    private static String OUTPUT_FOLDER="/Users/devashis/TwitterData/"; //Give full path
    private static int    NUM_OF_ROWS=10; //Set -1 to run foreever
public void getData() {
    try {
        //Create the Oath
        OAuthService service = new ServiceBuilder()
                .provider(TwitterApi.class)
                .apiKey(CONSUMER_KEY)
                .apiSecret(CONSUMER_SECRET)
                .build();

        //Create the token
        Token twitterAccessToken = new Token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET);

        //Create the request
        OAuthRequest request = new OAuthRequest(Verb.POST, STREAM_URL);
        request.addHeader("version", "HTTP/1.1");
        request.addHeader("host", "stream.twitter.com");
        request.addHeader("user-agent", "Twitter Stream Reader");
        request.addBodyParameter("track", TRACK_KEYWORD);
        request.setConnectionKeepAlive(true);

        //Sign and send the request
        service.signRequest(twitterAccessToken, request);
        Response response = request.send();

        // Create a reader to read Twitter's stream
        BufferedReader reader = new BufferedReader(new InputStreamReader(response.getStream()));

       //Change the delimiter below if you want a different delimiter
        String delimiter = "|^|";

        //Prepare the file for writing to        SimpleDateFormat dt = new SimpleDateFormat("yyyyMMddhhmmss");
        System.out.println(OUTPUT_FOLDER+ dt.format(Calendar.getInstance().getTime())+".txt");
        File file = new File(OUTPUT_FOLDER+ dt.format(Calendar.getInstance().getTime())+".txt");
        file.createNewFile();
        FileWriter fw = new FileWriter(file.getAbsoluteFile());
        BufferedWriter bw = new BufferedWriter(fw);


        String tweet;
        int counter=0;
        while (((tweet = reader.readLine()) != null) && (counter != NUM_OF_ROWS)) {
            //Uncomment the following line if you want the entire JSON and decide on what data you want            //System.out.println(tweet);
            //Some JSON parsing to get the data required            JSONObject obj = new JSONObject(tweet);
            String createdAt = obj.getString("created_at");
            String id = obj.getString("id_str");
            String tweetText = obj.getString("text").replaceAll("[\\t\\n\\r]"," ");
            String userId = obj.getJSONObject("user").getString("id_str");
            String screenName = obj.getJSONObject("user").getString("screen_name");
            int followerCount = obj.getJSONObject("user").getInt("followers_count");
            String outRow =createdAt+delimiter+id+delimiter+tweetText+delimiter+
                    userId+delimiter+screenName+delimiter+followerCount;
            bw.write(outRow);
            bw.newLine();
            System.out.println(outRow);
            counter++;
        }

        bw.close();

    } catch (Exception e) {
        e.printStackTrace();
    }
}
}
In order to play with JSON data directly with pig and hive I changed the code above to write the JSON directly to the file (instead of parsing it) to play with PIG and hive to consume directly JSON files




Monday, December 21, 2015

Using Apache Hadoop Client Library from Mac local system to connect to remote hadoop cluster

In the process of writing some map reduce jobs, one of the biggest frustration is to upload the Jar to the cluster and then start a job. There are many ways to overcome this problem
  • Automate the build system to actually put the Jar on the cluster
  • Write a separate script that picks the Jar from the build target folder and put on the cluster
  • Ability to run map reduce jobs from local system

To me the third approach looked to the be best considering there is no hassle of doing a ssh into the cluster to run the job. I work with HDP platform and a Mac and hence the following steps. If you are on windows you may want to get cygwin first and try this approach. [I do not guarantee that this approach would work with windows]

I use a HW sandbox from my development purpose. However, I could not find the packaged hadoop core distributions for download. So I switched to Apache site. The following link as links to download the binaries for all versions

https://archive.apache.org/dist/hadoop/core/

I used 2.6.2 and a direct link for download is hadoop-2.6.2.tar.gz

I created a directory under my user folder for the files.

mkdir ~/Apache

Assuming the file was download to downloads folder

mv ~/Downloads/hadoop-2.6.2.* ~/Apache/
cd ~/Apache
tar -zxvf hadoop-2.6.2.*

Now the files are extracted and available in /Users/<your username>/Apache
Lets change the configurations to make the client talk to cluster when a command is run.

cd ~/Apache/hadoop-2.6.2/etc/hadoop

There are a bunch of files here. The 3 files that needs changes are
  • core-site.xml
  • hdfs-site.xml
  • yarn-site.xml

The best approach is to copy these files from your cluster and and put (replace the existing) in your ~/Apache/hadoop-2.6.2/etc/hadoop folder. Unless the default locations are changed, you can find these files on your cluster under /etc/hadoop/conf/ folder

Edit your bash profile file to include the environment variables required for hadoop client libraries and also to add the bin path.

vi ~/.bash_profile

Add the following lines to the end of the file. Do replace the user name to your username. If you are using a sandbox, you can set the username to root

export PATH=$PATH:/Users/<your local username>/Apache/hadoop-2.6.2/bin:/Users/dsarangi/Apache/hadoop-2.6.2/sbin
export HADOOP_USER_NAME=<your cluster username or root>
export HADOOP_HOME=/Users/<your username>/Apache/hadoop-2.6.2/
export HADOOP_CONF_DIR=/Users/<your username>/Apache/hadoop-2.6.2/etc/hadoop/

After saving the file, you can run the .bash_profile script to effect the changes in the current shell

. ~/.bash_profile

With these changes you are ready to go from your local.

Note: if you are using a HW sandbox on your local system with host-only networking, modify your hosts file  /private/etc/hosts with textedit and add a line. Change the IP address to your sandbox host-only adapter IP address. (Run ifconfig in your sandbox to find the IP address)

192.168.56.102  sandbox.hortonworks.com 

This is just to ensure that if you are using sandbox.hortonworks.com in your *-site.xml, they get redirected appropriately. In the core-site.xml, hfs-site.xml and yarn-site.xml you can set


Saturday, December 19, 2015

Hive on Spark With HDP 2.3.2

It took me a while to figure this out. Hope this guide helps others. I am using HDP 2.3.2 on windows azure but not using any windows command, so this should work on HDP 2.3.2 on any OS

If using a sandbox please make sure that the virtual box appliance has a host only network and a NAT adapter attached. Host only is required to connect to the sandbox from outside and NAT is required for the sandbox to talk to internet. Look for a previous post on this blog about how to do it.

Setup Maven for build


This may not be required for everybody but for me it kept giving warning that I have an older version than 3.3.3 so I decided to get a later version

sudo su
export PATH=./:$PATH
wget http://apache.arvixe.com/spark/spark-1.4.1/spark-1.4.1.tgz
tar -zxvf apache-maven-3.3.9-bin.tar.gz
export M2_HOME=./apache-maven-3.3.9
export PATH=$M2_HOME/bin:$PATH

Test if maven is setup correctly

which mvn

This should point to the mvn to be in ./apache-maven-3.3.9/bin

Note: If you still get build errors, you may want to modify the make_distribution.sh

vi make_distribution.sh
look for

MVN="$SPARK_HOME/build/mvn"

and replace with the following. Put your path to maven appropriately.

export M2_HOME=/usr/hdp/build/sparkbuild/apache-maven-3.3.9/

MVN="/usr/hdp/build/sparkbuild/apache-maven-3.3.9/bin/mvn"


Get the source files for spark and extract

wget http://www.apache.org/dyn/closer.lua/spark/spark-1.4.1/spark-1.4.1.tgz
tar -zxvf spark-1.4.1.tgz

IF you get error with the tar command, likely the tar file is corrupt. I had to switch links. Make sure the spark version downloaded is 1.4.1

Start the build with instructions not to build with hive dependancy. you may not include ",paraquat-provided" in below command if you don't plan to use parquet file storage.

cd spark-1.4.1
make-distribution.sh --name "hadoop2-without-hive" --tgz "-Pyarn,hadoop-provided,hadoop-2.4,parquet-provided"

Note: Watch the screen for 30 seconds for any confirmation message, else go for a walk as this is going to run for a while.

The build process creates a file with name spark-1.4.1-bin-hadoop2-without-hive.tgz


Configure the environment

Extract and move the binaries to hdp root

tar -zxvf spark-1.4.1-bin-hadoop2-without-hive.tgz
mv spark-1.4.1-bin-hadoop2-without-hive/ /usr/hdp/spark-1.4.1

Add the required hadoop libraries to Spark class path configuration

cd /usr/hdp/spark-1.4.1/conf
cp spark-env.sh.template spark-env.sh

vi spark-env.sh

Add the following line to the end of "spark-env.sh". Do not replace (hadoop class path) with anything. The following line need to be added to spark-env.sh as it is

export SPARK_DIST_CLASSPATH=$(hadoop classpath)

There is a issue in the mapred-site.xml which leads to a failure in startup with "bad substitution message". Remove the path from mapred config file.

vi /etc/hadoop/conf/mapred-site.xml 

search for mapreduce.application.classpath in editor
Remove the text
:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar 
from the value

Create the log directory for spark
mkdir /tmp/spark-events

Change the Yarn Scheduler class to Fair Scheduler. You can do this through Ambari or by making changes to yarn-site.xml

vi /etc/hadoop/conf/yarn-site.xml

Search for name "yarn.resourcemanager.scheduler.class" and change the value from
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler"
to
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"

Alternatively you can change the scheduler value through Ambari. Screenshot below


Restart Yarn, Hive, Mapreduce services. Oozie may need a restart also.


Test if the setup was successful

Start hive cli as hive user

sudo -u hive hive

At the hive prompt give the following commands

set spark.home=/usr/hdp/spark-1.4.1;
set hive.execution.engine=spark;
set spark.master=yarn-client;
set spark.eventLog.enabled=true;
set spark.eventLog.dir=/tmp/spark-events;
set spark.executor.memory=512m;
set spark.serializer=org.apache.spark.serializer.KryoSerializer;

Assuming the default database has the tables sample_07 and sample_08, issue the following SQL at hive prompt. Its a sample cross join query to make sure hive is using Spark

create table sparkytable as select a.* from sample_07 a inner join sample_08 b;

While the query is running, in another terminal you can follow the hive cli log by using

tail -f /tmp/hive/hive.log

You would notice in the logs that hive brings up a spark client to run the queries.

Note: The first time the query may appear to run very slow as spark request yarn to start a container. The subsequent queries should run faster.

Make the changes permanent

Fire up a browser and bring up ambari. Log in using admin/admin. Go to configurations - > advanced for hive and add the parameters to the hive-site.xml

spark.master=yarn-client;
spark.eventLog.enabled=true;
spark.eventLog.dir=/tmp/spark-events;
spark.executor.memory=512m;
spark.serializer=org.apache.spark.serializer.KryoSerializer;

Screenshot from my Ambari



Or you can even directly add them to hive-site.xml

Notice,the above parameters do not include the hive.execution.engine parameter. Restart hive.

Now you can in run time chose between MR, Tez or Spark as the execution engine by using the following statements on hive cli or your favorite SQL editor connecting to hive.

set hive.execution.engine=spark;
set hive.execution.engine=tez;
set hive.execution.engine=mr;

You can verify that these engines are actually used by starting a query and then checking on Resource Manager UI for yarn




Thursday, December 17, 2015

Hello HDFS : Connecting to HDFS and Performing File Operations using Java

Following are some code snippets to connect to a remote HDFS and of file operations.

I used a hortonworks sandbox to play with. You may want to check my previous posts about how to configure the network for hartonworks sandbox to be able to access it from your local system. Link

I am using a Hortonworks 2.2 sandbox that can be downloaded from HDP 2.2


  • Configure development environment
  • Code
  • Test

Configure the development environment:

The best way to get the libraries is through maven. The POM file is below

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>HDP2_2</groupId>
    <artifactId>dev</artifactId>
    <version>1.0-SNAPSHOT</version>
    <repositories>
        <repository>
            <id>repo.hortonworks.com</id>
            <name>Hortonworks HDP Maven Repository</name>
            <url>http://repo.hortonworks.com/content/repositories/releases/</url>
        </repository>

    </repositories>

    <dependencies>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0.2.2.0.0-2041</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0.2.2.0.0-2041</version>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.2</version>
        </dependency>
        <dependency>
            <groupId>org.mortbay.jetty</groupId>
            <artifactId>jetty</artifactId>
            <version>6.1.26</version>
        </dependency>
        <dependency>
            <groupId>org.mortbay.jetty</groupId>
            <artifactId>jetty-util</artifactId>
            <version>6.1.26</version>
        </dependency>
    </dependencies>



</project>



Following are 3 classes that I created to my Hello World to HDFS. You may write it differently
I am eventually creating a directory in HDFS with name MyDirectory. Using the instance of HDFSAccess below, you can use it to do a host of things that can be done on a filesystem. Change the IP Address and User Name in AppProps



AppProps.java

package com.self.train.hdfs;

public class AppProps {

    public static final String HDFS_URL = "hdfs://192.168.56.102:8020";
    public static final String hdfsCorePath="/etc/hadoop/conf/core-site.xml";
    public static final String hdfsSitePath="/etc/hadoop/conf/hdfs-site.xml";
    public static final String hadoopUser="root";

    public static void init(){
        System.setProperty("HADOOP_USER_NAME", hadoopUser);
    }

}

HDFSAccess.java

package com.self.train.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;


public class HDFSAccess {
    private Configuration conf;

    /*    Constructor    */
    public HDFSAccess(String HDFSUrl, String corePath, String sitePath) {
        conf = new Configuration();
        conf.addResource(new Path(corePath));
        conf.addResource(new Path(sitePath));
        conf.set("fs.defaultFS", HDFSUrl);
    }
    

    public boolean createDirectory(String dirName){

        Path dir = new Path(dirName);
        try {
            FileSystem fs = FileSystem.get(conf);
            if (!fs.exists(dir)) {
                fs.mkdirs(dir);
            }else{
                System.out.println("Directory already exists with name:"+ dirName);
                return false;
            }

        }catch(IOException e){
            System.out.println("Exception encountered while creating directory. Details: \n"+ e.getMessage());
            return false;
        }
        System.out.println("Directory created with name:"+ dirName);
        return true;
    }
}


Stub.java (For testing)

package com.self.train.hdfs;

public class Stub {

    public static void main(String args[]){
        AppProps.init();
        HDFSAccess ha = new HDFSAccess(AppProps.HDFS_URL, AppProps.hdfsCorePath, AppProps.hdfsSitePath);
        ha.createDirectory("MyDirectory");
    }
}

Thursday, October 15, 2015

"Spark SQL Client" : Do Spark SQL Using a Query Editor

With mild hacks Aginity workbench for hadoop can be used with SparkSQL as a client to run queries.


How do I get Spark SQL?
If you are looking for deploying a herd of elephants to churn the forest, then you already know it. If you are looking for an infant elephant to snuggle and get a feel, hortonworks sandbox 2.3 would do it. Download a 2.3 sandbox from hortonworks . I have another post about getting host only network setup with Virtualbox and hortonworks sandbox Link

Please note: Without Host only network, external clients can not connect to the sandbox. Make sure the sandbox is configured correctly. Link


Assuming: We have a working environment now.


Some Concepts (Feel free to correct me through comments if I am wrong)



  • For hive clients or spark clients to talk and issue commands against said services, a thrift service is required. E.g Hiveserver / Hiveserver2 are thrift services for external clients for hive
  • For spark sql to be accessible to external clients, spark thrift service has to run
  • For Spark SQL to work, spark should know about hive meta store. So we have to keep Spark informed about the hive metastore configuration.

Confirm Hive is working
Start a hive shell. And fire a query




  • On the hortonworks sandbox use ALT + F5 and then log in using root/hadoop
  • issue the command "hive" to fire up a shell. 
  • "show tables;" should show the tables in default schema.
  • "select count(*) from sample_07;" one of the existing tables should confirm hive is working
Confirm Spark is working

Before brining up spark shell, it would advised to change the logging level from info to warn as otherwise spark spits out a lot of text to console.


Open the file /usr/hdp/current/spark-client/conf/log4j.properties and on the first line change the log level from INFO to WARN.


After the change my file looks like below





Fire up a spark-sql shell and fire some commands to know that spark is working fine. Following screenshot from my environment confirms that its working.


Note: If you have not changed your log level as mentioned above, the below screen would look very different





Start Spark thrift service

To avoid thrift server for hive and spark not to collide on same port we need to start the spark thrift on a different port. 


Run the following command.
start-thriftserver.sh --master yarn --executor-memory 512m --hiveconf hive.server2.thrift.port=11000
Notice the output log file mentioned in the screenshot above
run the following command to tail the log to know that spark-sql is actually working
tail -f /var/log/spark/spark-root-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-sandbox.hortonworks.com.out


So now we have spark sql thrift service is running. Time to connect the client. 

Use Aginity Workbench to connect to Spark

Get Aginity Workbench for hadoop if you do not have it already. Its free.
Notice below the port number 12000. By default its 10000. Since while starting the thrift service we changed it to 12000, we would connect on 12000














After connecting, I could run a query and see the results. The screenshot below confirms that

How do we know that spark-sql is actually running and giving us the results.
Look at the log tail that we ran before... here is a screenshot.

So enjoy spark SQL on a nice client interface.