Nifi表达式和自定义处理器(基于HDP)
Short Description:
Getting started with Nifi expression language and custom Nifi processors on HDP sandbox
Article
Getting started with Nifi expression language and custom Nifi processors on HDP sandbox
This tutorial is part of a webinar for partners on Hortonworks DataFlow. The recording will be made available at
Background
- For a primer on HDF, you can refer to the materials to get a basic background
- A basic tutorial on using Nifi on HDP sandbox is also available
Goals
- Build Nifi flow to analyze Nifi's network traffic using tcpdump. Use Expression Language to extract out source/target IPs/ports
- Build and use custom tcpdump processor to filter Nifi's source/target IPs/ports on HDP sandbox
- Note that:
- Nifi can be installed independent of HDP
- The custom processor also can be built on any machine where Java and eclipse are installed
- Sandbox is being used for demo purposes, to have everything in one place
Pre-Requisites: Install Nifi on sandbox
- The lab is designed for the HDP Sandbox. Download the HDP Sandbox , import into VMWare Fusion and start the VM
- After it boots up, find the IP address of the VM and add an entry into your machines hosts file e.g.
- 192.168.191.241 sandbox.hortonworks.com sandbox
- Connect to the VM via SSH (root/hadoop), correct the /etc/hosts entry
- ssh root@sandbox.hortonworks.com
- Deploy Nifi Ambari service on sandbox by running below
- VERSION=`hdp-select status hadoop-client | sed 's/hadoop-client - \([0-9]\.[0-9]\).*/\1/'`
- sudo git clone https://github.com/abajwa-hw/ambari-nifi-service.git /var/lib/ambari-server/resources/stacks/HDP/$VERSION/services/NIFI
- #sandbox
- service ambari restart
- #non sandbox
- service ambari-server restart
- To install Nifi, start the 'Install Wizard': Open Ambari () then:
- On bottom left -> Actions -> Add service -> check NiFi server -> Next -> Next -> Change any config you like (e.g. install dir, port, setup_prebuilt or values in nifi.properties) -> Next -> Deploy. This will kick off the install which will run for 5-10min.
- Once installed, launch Nifi by opening
Steps
Explore tcpdump
- Tcpdump is a common packet analyzer that runs under the command line. It allows the user to display TCP/IP and other packets being transmitted or received over a network to which the computer is attached. Full details can be found
- To install tcdump on sandbox:
- yum install -y tcpdump
- Here is a common usage:
- tcpdump -n -nn
- On sandbox, this will output something like below for each network connection being made, showing:
- which socket (i.e. IP/port) was the source (to the left of >) and
- which was the target (to the right of >)
- 08:16:15.878652 IP 192.168.191.1.49270 > 192.168.191.144.9090: Flags [.], ack 2255, win 8174, options [nop,nop,TS val 1176961367 ecr 32747195], length 0
- In the example above:
- the source machine was 192.168.191.1 (port 49270) and
- the target machine was 192.168.191.144 (port 9090)
- Note that since Nifi is running on port 9090, by monitoring traffic to port 9090, we will be able to capture connections made by Nifi
Build tcpdump flow using ExecuteProcess and EL
- Download to local laptop (not sandbox) xml template for flow that uses ExecuteProcess/EL to parse tcpdump flow from
- On the Nifi webui, import flow template:
- Import template by clicking on Templates (third icon from right) which will launch the 'Nifi Flow templates' popup
- Browse and navigate to where ever you downloaded TCPDump_EL_Exmple.xml on your local machine
- Click Import. Now the template should appear in 'Nifi Flow templates' popup window
- Close the popup window
- Instantiate the 'TCPDump EL Example' dashboard template:
- Drag/drop the Template icon (7th icon form left) onto the canvas so that a picklist popup appears
- Select 'TCPDump EL Example' and click Add
- Run the flow. After a few seconds you should see all the counters increase
- Overview of flow:
- ExecuteProcess: Runs
tcpdump -n -nn
- SplitText: split output into lines
- ExtractText: extract the src/dest sockets using regex Expression Language
- src.socket will store socket before the
>
:(\d+\.\d+\.\d+\.\d+\.\d+)\s+>
- dest.socket will store socket after the
<
:>\s+(\d+\.\d+\.\d+\.\d+\.\d+)
- src.socket will store socket before the
- RouteOnAttribute: filter by destination socket where port is 9090
web.server.dest
=${dest.socket:endsWith(".9090")}
- Logattribute: log attribute
- ExecuteProcess: Runs
- Check details of what events were logged:
- Open Provenance window (5th icon from top right)
- In top right, filter
by component type
:LogAttribute
and click on 'Show lineage' icon of first record (near top right) - Right click on
Route
>View details
. - Click the
Content
tab and clickView
- Notice that the destination socket for the event shows port 9090
- For more details on Nifi Expression Language see
- Stop the flow using the stop button
Build custom processor for tcpdump
- Setup your sandbox for development by using to install VNC/eclipse/maven
- Download Ambari service for VNC (details below)
- VERSION=`hdp-select status hadoop-client | sed 's/hadoop-client - \([0-9]\.[0-9]\).*/\1/'`
- sudo git clone https://github.com/hortonworks-gallery/ambari-vnc-service.git /var/lib/ambari-server/resources/stacks/HDP/$VERSION/services/VNCSERVER
- service ambari restart
- Once the status of HDFS/YARN has changed from a yellow question mark to a green check mark...
- Setup Eclipse on the sandbox VM and remote desktop into it using an Ambari service for VNC
- In Ambari open, Admin > Stacks and Services tab. You can access this via
- Deploy the service by selecting:
- VNC Server -> Add service -> Next -> Next -> Enter password (e.g. hadoop) -> Next -> Proceed Anyway -> Deploy
- Make sure the password is at least 6 characters or install will fail
- Connect to VNC from local laptop using a VNC viewer software (e.g. Tight VNC viewer or Chicken of the VNC or just your browser). Detailed steps
- (Optional): To install maven manually instead:
- curl -o /etc/yum.repos.d/epel-apache-maven.repo https://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo
- yum -y install apache-maven-3.2*
- In general, when starting a new project you would use the mvn archetype to create a custom processor. Details here:
- Command to run the wizard:
- cd /tmp
- mvn archetype:generate -DarchetypeGroupId=org.apache.nifi -DarchetypeArtifactId=nifi-processor-bundle-archetype -DarchetypeVersion=0.2.1 -DnifiVersion=0.2.1
- Sample inputs to generate a maven project archetype skeleton.
- Define value for property 'groupId': : com.hortonworks
- Define value for property 'artifactId': : nifi-network-processors
- Define value for property 'version': 1.0-SNAPSHOT: :
- Define value for property 'artifactBaseName': : network
- Define value for property 'package': com.hortonworks.processors.network: :
- This will create an archetype maven project for a custom processor with the package name, artifactId, etc specified above.
- In this case we will download a previously built sample and walk through what changes you would need to make to the archetype to create a basic custom processor
- cd
- sudo git clone https://github.com/abajwa-hw/nifi-network-processor.git
- Open Eclipse using the shortcut on the Desktop
- Import to Eclipse
- File > Import > Maven > Existing Maven projects
- Browse > root > nifi-network-processor > OK > Finish
- Here is a summary of code changes made to the generated archetype to create the sample tcpdump processor:
- pom.xml: add commons-io dependency (for utils)
- In org.apache.nifi.processor.Processor, add the class name
- In :
- Define the
tags
anddescription
usingand
@CapabilityDescription
e.g.
- //Define the processor tags and description which will be displayed on Nifi UI
- @Tags({"fetch","tcpdump","tcp", "network"})
- @CapabilityDescription("Reads output of tcpdump and outputs the results as a Flowfile")
These would get displayed on the 'Add processor' screen of Nifi UI
- Define
properties
for the processor e.g.
- //Define properties for the processor
- public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
- .Builder().name("My Property")
- .description("Example Property")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
These would get displayed on the 'Properties' tab of the GetTcpDumpAttributes processor:
- Define
relationships
for the processor e.g.
- //Define relationships for the processor
- public static final Relationship SUCCESS_RELATIONSHIP = new Relationship.Builder()
- .name("success")
- .description("Success relationship")
- .build();
These would get displayed on the 'Settings' tab of the GetTcpDumpAttributes processor:
- Any initializations to be done when Nifi starts would be done in
init()
onTrigger()
is the main method to override to define the logic when a flow file is passed to our processor. This is where we parse a line of tcpdump output and store the src and destination sockets
- Define the
- In , you can define a Junit to test that the processor is working correctly
- To run maven compile:
- In Eclipse, under 'Package Explorer' select 'network-analysis' and then click:
- Run > Run Configurations
- Then double click 'Maven Build'. It will prompt you for the configuration. Enter the below:
- Name:
nifi-network
- Base dir:
/root/nifi-network-processor
- Under 'Goals':
clean package
- Under Maven Runtime: (scroll down to see this option). We will be adding the location of the existing mvn install using the steps below as its runs faster than using the embedded one:
- Configure > Add > click ‘Directory’ and navigate to mvn install:
/usr/share/apache-maven
> OK > Finish > Selectapache-maven
> Apply > OK
- Configure > Add > click ‘Directory’ and navigate to mvn install:
- So your maven run configuration should look as below
- Click Apply > Run to start compile
- Name:
- In Eclipse, under 'Package Explorer' select 'network-analysis' and then click:
- To run Junit to confirm processor is working correctly
- In Eclipse, under
Package Explorer
selectnifi-network-processors
and then click:- Run > Run as > JUnit test
- After a few seconds the test should pass and you should see below (in green):
- To see what happens if test does not pass, try changing the value of the dest.socket by prefixing the values with random digits (as highlighted below), save your changes and re-run JUnit
- This time you will see the test fail (in red below)
- Press Control-Z to undo your changes
- In Eclipse, under
- Confirm the nar file (Nifi library file for your processor) file got built by running the maven build
- ls -la ~/nifi-network-processor/nifi-network-nar/target/nifi-network-nar-1.0-SNAPSHOT.nar
- Deploy the nar into Nifi: copy the compiled nar file into Nifi lib dir and correct permissions
- cp ~/nifi-network-processor/nifi-network-nar/target/nifi-network-nar-1.0-SNAPSHOT.nar /opt/nifi-1.0.0.0-7/lib/
- chown nifi:hadoop /opt/nifi-1.0.0.0-7/lib/nifi-network-nar-1.0-SNAPSHOT.nar
- Restart Nifi from Ambari
- Download to local laptop (not sandbox), the xml template for the flow that uses Custom processor to parse tcpdump flow from
- Open Nifi UI and delete the existing flow by:
- Control-A to select all the components and right click on any processor and select Delete
- Import the custom processor flow template info Nifi:
- Import template by clicking on Templates (third icon from right) which will launch the 'Nifi Flow templates' popup
- Browse and navigate to where ever you downloaded TCPDump_Custom_Processor_Exmple.xml on your local machine
- Click Import. Now the template should appear in 'Nifi Flow templates' popup window
- Close the popup window
- Instantiate the 'TCPDump_Custom_Processor_Exmple' dashboard template:
- Drag/drop the Template icon (7th icon form left) onto the canvas so that a picklist popup appears
- Select 'TCPDump_Custom_Processor_Exmple' and click Add
- Run the flow. After a few seconds you should see all the counters increase
- Overview of flow:
- ExecuteProcess: Runs
tcpdump -n -nn
- SplitText: split output into lines
- GetTcpDumpAttributes: extract the src/dest sockets using the custom processor we built
- src.socket will store socket before the
>
:(\d+\.\d+\.\d+\.\d+\.\d+)\s+>
- dest.socket will store socket after the
<
:>\s+(\d+\.\d+\.\d+\.\d+\.\d+)
- src.socket will store socket before the
- RouteOnAttribute: filter by destination socket where port is 9090
web.server.dest
=${dest.socket:endsWith(".9090")}
- Logattribute: log attribute
- ExecuteProcess: Runs
- Open Provenance window and repeat previous steps to confirm that the destination socket for the events shows port 9090
- You have successfully created flows to analyze network traffic using both expression languages and also a basic custom processor
Further reading