Data Warehouse Implementation

This article describes the technical implementation details of exporting data from Emcien and storing it inside your Enterprise Data Warehouse (EDW). The goal is to create actionable tasks lists or next-step reports to help a business group tackle the challenging problem of Supply Chain Order Priority. 

Below we will:

  • Lay out the data lifecycle and end goal
  • Define a basic database schema optimized for our use case
  • Outline the scaffolding code used between Emcien and the EDW
  • Demonstrate the full Emcien-to-EDW lifecycle

In this article, we use the Ruby programming language and MySQL database. Any programming language or database may be used, just follow the concepts outlined below while using your language and database of choice.

All sample code is available at the Emcien Data Warehouse Loader repository. This code sample uses libraries like HTTParty for http requests, trollop for command line flag passing, and mysql2 ruby gem for database-code integration.


Data Lifecycle and End Goal

Our objective is to deliver an actionable task list to the Order Expeditor business team using customer data analyzed daily by Emcien. Our deliverable will be a list of Product Names which have a high probability of being 'Critical'.

This article will focus on the integration points between each system. The content and outline of the actionable task list is not the focus of this article. For more information about other use cases Emcien solves, please visit our website.


Reporting Database Schema

How do you store the data? That is typically the first question data warehousers ask. In our example, this deliverable displays a list of products which have a high probability of Critical Order Priority. 

Below is a database schema. In our example, we are using MySQL but any database can be used. 

Source Data

Our source data is the public Super Store data set. This dataset is converted to the Emcien Long format for our use case. 

As you can see, this data has numeric and categorical data. Emcien is ideal for datasets with a mixture of data types. 

Create a Database
CREATE DATABASE emcien_data_warehouse_for_order_priority;
Create a Table to track each Analysis

This use case is a daily, reccurring process. This table is to track the daily Emcien runs. We will use this table to query the latest results or to track historic data. 

CREATE TABLE emcien_analyses (
  id int unsigned auto_increment PRIMARY KEY,
  emcien_id integer,
  name varchar(255),
  created_at datetime,
  state varchar(255)
);
Create a Table & Index for Emcien Prediction Rules

Our task list is based on Emcien Prediction Rules. This table will store the rules in a denormalized fashion. By 'flattening' the relationships between rules, items, categories, and outcomes, the downstream reports tool (such as Tableau or Qlik) will have more efficient queries.

Below is an example Emcien Prediction Rule. The basic components of a rule are item(s), probability, and outcome.

This schema is very dependent on your use case - you would craft this table to deliver the data you need for your business users, but for our use case the below schema is ideal.

CREATE TABLE emcien_rules (
  id integer unsigned auto_increment PRIMARY KEY,
  name varchar(255),
  size integer,
  category varchar(255),
  frequency integer,
  lift float,
  outcome varchar(255),
  probability float,
  emcien_id integer,

  INDEX rule_name_category (name, category)
);
Create a Table & Index for Emcien Recommendations

A powerful part of Emcien is the ability to make accurate, highly-relevant recommendations. This table will store the recommendation for each product and act as an option when expediting order shipments.

Below is an example Emcien Product Recommendation. The basic components are Product A, probability, and Product B.

CREATE TABLE emcien_product_recommendations (
  id integer unsigned auto_increment PRIMARY KEY,
  product_name_a varchar(255),
  product_name_b varchar(255),
  product_recommendation_strength float,
  product_recommendation_frequency integer,
  product_recommendation_conditional_probability float,

  INDEX recommendation_name_a (product_name_a)
);

This database schema is optimized for the customer value use case. The table designs can be altered to meet your requirements. Use SQL to shape the table as needed.


Integration Code

Our analysis and load process contains six stages. Our code will manage each piece separately. 

Stage 1: Banding Numeric Data

This stage converts numeric ranges into smart 'bins'. This tactic is very useful when making predictions because you often care about a range of numbers, not a specific floating point value.

Below we are POST-ing our source data to the Bandit API to create a new Bandit Run. We check the status codes to verify the success of the request and `die`, or exit the program, if we have problems. In production you would like to log and alert if any part of this process dies. In the `die` function, add your syslog server or other alert framework hooks.

create_bandit_run = API.post("/api/v1/runs", Config.bandit_params.to_h)
die(create_bandit_run) unless (create_bandit_run.code == 202)

bandit_run_id = create_bandit_run["data"]["id"]
bandit_run = API.get("/api/v1/runs/#{bandit_run_id}")
die(bandit_run) unless (bandit_run.code == 200)

Next we poll the status of the Bandit Run to determine when the job is complete. Once complete, we GET the `breaks file`. This breaks file will be used when building Rules. The breaks file is a simple CSV usually less than 10Kb. Read here for more information about Emcien Bandit and the breaks file.

while (bandit_run.code == 200 && bandit_run["data"]["state"] !~ /ready/)
  sleep(10)
  bandit_run = API.get("/api/v1/runs/#{bandit_run_id}")
end

breaks_text = HTTParty.get("https://#{bandit_run["links"]["breaks_file"]}", headers: API.default_get_headers)

Stage 2: Building Rules

Our use case is built upon prediction rules. This step POSTs source data to the Emcien engine. Again, we check the status codes to verify the success of the request and `die`, or exit the program, if we have problems.

create_rules = API.post("/api/v1/reports", params_with_breaks)
die(create_rules) unless (create_rules.code == 202)

report_id = create_rules["data"]["id"]
report = API.get("/api/v1/reports/#{report_id}")
die(report) unless (report.code == 200)

Here we poll the status of the Report to wait for Emcien to complete the analysis. This may take several minutes, therefore polling is necessary.

while (report.code == 200 && report["data"]["state"] !~ /ready/)
  if report["data"]["state"] =~ /failed/
    die("Failed ... #{report["data"]["state"]}")
  end
  
  sleep(10)
  report = API.get("/api/v1/reports/#{report_id}")
end

Stage 3: Storing the Rules Report

Now we have data from the Emcien engine and need to load it into our data warehouse. The first step is connect to the database we will use for our Task Lists. Once connected to the database, we will store our rules' Emcien ID and timestamp so that we can track historic changes.

client = Mysql2::Client.new(host: Config.database.host, username: Config.database.user, password: Config.database.password)
client.select_db(Config.database.name)

sql = client.prepare("INSERT INTO #{Config.database.reports_table["name"]} VALUES (NULL,?,?,NOW())")
result = sql.execute(report['data']['id'], report['data']['name'])

Stage 4: Storing Rules

Your source data may produce thousands of rules. To store these rules in your data warehouse, we will page through the Emcien Rules API to fetch 100 rules at a time. 

while $page < $total_pages do
  # Fetch a page of Rules
  $rules = API.get("/api/v1/reports/#{report_id}/rules?page=#{$page}&size=100&filter[size]=1")

  # Setup Database Connection
  sql = client.prepare("INSERT INTO #{db.rules_table["name"]} VALUES (NULL,?,?,?,?,?,?,?)")

  # Iterate through each rule
  while $rule_idx < $total_rules do
    rule = $rules['data'][$rule_idx]
    
    # Format Value for BI Report
    name = rule['item_names'][1..-2]
    category = rule['category_names'][1..-2]
    frequency = rule['cluster_frequency'].to_i
    lift = rule['lift']
    outcome = rule['outcome_item_name']
    cprob = rule['cprob']
    
    # Store Rule in Data Warehouse
    result = sql.execute(name, 1, category, frequency, lift, outcome, cprob)
  end
end

Stage 5: Building Product Recommendations

The second part of our use case relies on Product Recommendations. This step POSTs source data to the Emcien engine using parameters that omit the outcome category. Again, we check the status codes to verify the success of the request and `die`, or exit the program, if we have problems. Alerting and logging are up to your preferences.

create_recommendations = API.post("/api/v1/reports", affinity_params)
die(create_recommendations) unless (create_recommendations.code == 202)

report_id = create_recommendations["data"]["id"]
report = API.get("/api/v1/reports/#{report_id}")
die(report) unless (report.code == 200)

Here we poll the status of the Report to wait for Emcien to complete the analysis. This may take several minutes, therefore polling is necessary.

while (report.code == 200 && report["data"]["state"] !~ /ready/)
  if report["data"]["state"] =~ /failed/
    die("Failed ... #{report["data"]["state"]}")
  end

  sleep(10)
  report = API.get("/api/v1/reports/#{report_id}")
end

Stage 6: Storing Product Recommendations

Similar to storing rules, your source data may produce thousands of product recommendations. In this stage, we will page through the Emcien Product Recommendations API and store each recommendation into our data warehouse. 

Interesting note, Product Recommendations are different depending on which is the recommending item. For example when you buy a printer you usually buy ink. When you buy ink, you do not typically buy a new printer (you are replacing the ink). Since the probability of a recommendation is dependent on which item we start with, we will store each recommendation individually and calculate the probabilities on the fly.

while $recommendation_page < $total_recommendation_pages do
  # Request a page of Recommendations
  $recommendation = API.get("/api/v1/reports/#{report_id}/clusters?page=#{$affinity_page}&size=100&filter[size]=2")

  # Setup Database Connection
  sql = client.prepare("INSERT INTO #{db.recommendation_table["name"]} VALUES (NULL,?,?,?)")

  # Iterate through each Recommendation
  while $recommendations_idx < $total_recommendations do
    $recommendation = $recommendations['data'][$recommendations_idx]

    # Recommendation Data
    $names = $recommendation['item_names'].split("|")
    $ids = $recommendation['item_ids'].split("|")
    $strength = $recommendation['strength']
    $recommendation_frequency = $recommendation['count']

    # Product A Recommendation
    # Use Emcien Item API to get Product A frequency
    $product_a = API.get("/api/v1/reports/#{report_id}/items/#{$ids[1]}")
    $product_a_frequency = $product_a['data']['transaction_count']
    $product_a_cprob = $product_a_frequency.to_f / $frequency.to_f
    result = sql.execute($names[1], $names[2], $strength, $recommendation_frequency, $product_a_cprob)

    #Product B Recommendation
    # Use Emcien Item API to get Product B frequency
    $product_b = API.get("/api/v1/reports/#{report_id}/items/#{$ids[2]}")
    $product_b_frequency = $product_b['data']['transaction_count']
    $product_b_cprob = $product_b_frequency.to_f / $frequency.to_f
    result = sql.execute($names[2], $names[1], $strength, $recommendation_frequency, $product_b_cprob)

    $total_recommendations = $recommendations['meta']['records_on_page']
    $total_recommendation_pages = $recommendations['meta']['pages_total']
    $recommendations_idx += 1
  end
end

At this point you are done! We have built both Rule and Product Recommendations using the Emcien engine and have loaded this data into your data warehouse for BI Reporting and Tasks Lists.

Wrap up

To wrap up we have implemented a six stage process of interacting with Emcien via the RESTful APIs and loading data into the EWS for use by the business team. The basic framework outlined above can be shaped per use case and implementation scenario. Please feel free to view our reference code or contact support for additional help.