Scenario:

If a dataset like the one mentioned below:

id,fname,lname,salary

1,Nitin,BS,10

2,Alex,P,20

3,Hrithik,R,25


And, the requirement is like he would like to increase the salary for all the employees by 5%, he could use the custom transformation feature of Infoworks.


Steps to perform:


Check if the python_custom_executable_path in conf.properties is pointing to the latest python version as Infoworks, else it will use the default python on edge node and fail.

Ex:


python_custom_executable_path=/opt/infoworks/resources/python36/bin


First install the api egg

$ python -m easy_install /opt/infoworks/df/python_scripts/api-1.0.egg

Processing api-1.0.egg

Copying api-1.0.egg to /opt/infoworks/resources/python36/lib/python3.6/site-packages

Adding api 1.0 to easy-install.pth file


Installed /opt/infoworks/resources/python36/lib/python3.6/site-packages/api-1.0.egg

Processing dependencies for api==1.0

Finished processing dependencies for api==1.0



Create a directory structure on the edge node as shown in the below example

mkdir -p /home/ec2-user/customtransformations/cusscripts


Where cusscripts is the directory where user can store their custom transformation python code

Write your custom transformation code, however, you could use $IW_HOME/examples/pipeline_extensions/custom_transformations.py code as reference.


Note:

Make sure you write your custom transformation considering input_dataset as java object since we use py4j to internally convert to python code.


My custom_transformation_example.py code looks like the one below:


from api.custom_transformation import CustomTransformation

from py4j.java_collections import JavaList

from pyspark.sql import SparkSession

from pyspark.sql.types import *

from pyspark.sql.functions import col, when

class CustomTransformationSample(CustomTransformation):


    def __init__(self):

        self._spark_session = None

        self._user_properties = None


    def transform(self,input_dataset_map):


        for key in input_dataset_map :

            input_dataset = input_dataset_map[key]

            break


        try :

            df_columns = input_dataset.schema()

            input_dataset = input_dataset.withColumn('salary_updated',input_dataset.col("salary").multiply(1.05))

        except Exception as e:

            raise Exception("Exception while adding 5% to salary columns {}".format(e))


        return input_dataset


    def initialise_context(self,spark_session,user_properties,processing_context):

        self._spark_session = spark_session

        self._user_properties = user_properties






Navigate to the parent directory: i.e

cd /home/ec2-user/customtransformations/


Create a setup.py file that is required to create the python egg file as below.


from setuptools import setup, find_packages

setup(

    name = "cusscripts",

    version = "0.1",

    packages = find_packages()

    )


In the above setup.py, I have provided the name as cusscripts because that is the directory that has custom transformation code and the one that will be converted to egg file.



source $IW_HOME/bin/env.sh(because we would need the python path to point to Infoworks python)


Next, navigate to cusscripts directory and do 

Touch __init__.py ,to create an empty init file.


Now jump back to parent directory i.e cd /home/ec2-user/customtransformations/


And execute below command to create the egg file.

python setup.py bdist_egg


Now that you have a subdirectory called dist under which we can find the egg file.


$ python -m easy_install cusscripts-0.1-py3.6.egg

Processing cusscripts-0.1-py3.6.egg

Copying cusscripts-0.1-py3.6.egg to /opt/infoworks/resources/python36/lib/python3.6/site-packages

Adding cusscripts 0.1 to easy-install.pth file


Installed /opt/infoworks/resources/python36/lib/python3.6/site-packages/cusscripts-0.1-py3.6.egg

Processing dependencies for cusscripts==0.1

Finished processing dependencies for cusscripts==0.1


Navigate to Admin section of Infoworks UI, and click on external scripts, which shows up pipeline extensions page.


Click on Add an extension,and configure it as follows:



Choose extension type as custom transformation, execution type as python, and Give your preferred name for transformation and alias.


Provide the folder path where your egg file is located, In my case, it is /home/ec2-user/nitin/customtransformations/dist


Next, provide the class Name as follows 

<python_package>.<python_module>.<class_name> 


In my case, cusscripts.custom_transformation_example.CustomTransformationSample



Once you have successfully configured the pipeline extension, make it available to your corresponding domain with the help of manager artifacts option in Domain.




Create your own, custom transformation pipeline as shown below:

Configure the custom transformation node as shown below and hit save.

Add the output columns to be mapped




Finally, configure the target node and build the pipeline.


You must have the data on the hive end as follows with a new column salary_updated that has 5% increase in salary.



hive> select * from Custom_transformations_schema.Custom_transformations_table;

OK

3    Hrithik    R    25    26.25    e0c39f74ecc3c8b850d5a45ea3c4aafd    2020-03-18 03:07:30.574    2020-03-18 03:07:30.574    I    0

2    Alex    P    20    21.0    52bfd5a747947ece3720d283edbc3954    2020-03-18 03:07:30.574    2020-03-18 03:07:30.574    I    0

1    Nitin    BS    10    10.5    a17c95d9db02cf15c1a4e352edee4e4c    2020-03-18 03:07:30.574    2020-03-18 03:07:30.574    I    0

Time taken: 10.489 seconds, Fetched: 3 row(s)

hive> describe Custom_transformations_schema.Custom_transformations_table;

OK

id                      int

fname                   string

lname                   string

salary                  int

salary_updated          double

ziw_row_id              string

ziw_created_timestamp    timestamp

ziw_updated_timestamp    timestamp

ziw_status_flag         string

ziw_sec_p               int