# -*- coding: utf-8 -*-
"""pricing.ipynb
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1iWB_rdCTiV3vZgasLbkDHTiLNV_vuxup
"""
import random
import matplotlib.patches as mpatches
from sortedcontainers import SortedDict
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib as mpl
from tqdm import tqdm
import numpy as np
import pandas as pd
import scipy
from scipy.stats import zscore
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error, r2_score, accuracy_score, confusion_matrix
import arviz as az
import pymc3 as pm
import theano.tensor as tt
import seaborn as sns
import os
from collections import Counter
from math import floor, ceil, isnan
quantity_discount = SortedDict({0: 0,
10: 0.05,
20: 0.10,
40: 0.20,
80:0.30,
160: 0.40
})
product_std_price = {
'1': 5,
'2': 10,
'3': 20,
'4': 40,
'5': 80
}
country_uncertainty = {'a':0.10, 'b':0.05, 'c':0.01}
country_ratio = ['a']*5 + ['b']*30 + ['c']*65
product_uncertainty = {'1':0.10, '2':0.08, '3':0.07, '4':0.01, '5':0.01}
product_ratio = ['1']*15 + ['2']*5 + ['3']*5 + ['4']*50 + ['5']*25
def invlogit(x):
return np.exp(x) / (1 + np.exp(x))
x = [i for i in range(-7,7)]
y = [ invlogit(i) for i in x]
plt.plot(x,y)
plt.ylabel('Probabilities')
plt.axhline(0.5, alpha=0.3, color='red')
plt.axvline(0, alpha=0.3, color='red')
def discounted_price(p_id, quantity):
for key in quantity_discount:
if key > quantity:
continue
else:
discount = quantity_discount[key]
unknown_component = random.random()*0.05 # additinal discount,... things that are not contained in data
return product_std_price[p_id] * (1 - discount) - product_std_price[p_id]*unknown_component
def determine_success(p_id, unit_price, country, noise):
ratio = unit_price/ product_std_price[p_id]
score = invlogit(ratio*3 - 2) - country_uncertainty[country] - product_uncertainty[p_id]
if score < 0.5 + noise*random.random():
return 1
else:
return 0
def generate_opportunity(id_num, noise=0.15):
country = random.choice(country_ratio)
p_id = random.choice(product_ratio)
amount = random.expovariate(0.01)
unit_price = discounted_price(p_id, amount)
status = determine_success(p_id, unit_price, country, noise)
return [id_num, unit_price, p_id, amount, country, status]
N=500
opportunities = list()
for i in range(N):
opp = generate_opportunity(i, 0.1)
opportunities.append(opp)
opps = pd.DataFrame(opportunities)
opps.columns = ['id', 'unit_price', 'p_id', 'amount', 'country', 'status']
opps['p_cty'] = opps['p_id'].astype(str) + opps.country
print(sum(opps.status)/len(opps))
status = np.array(opps['status'])
unit_price = np.array(opps['unit_price']) #euro
labels_country, _ = pd.factorize(opps['country'], sort=True)
country = np.array(labels_country)
labels_products, _ = pd.factorize(opps['p_id'], sort=True)
product_id = np.array(labels_products)
labels_p_cty, _ = pd.factorize(opps['p_cty'], sort=True)
p_cty = np.array(labels_p_cty)
normalized_offers = opps.groupby(['p_id']).unit_price.transform(lambda x : zscore(x, ddof=0))
def get_std_price(x):
return product_std_price[x]
opps['std_price'] = opps['p_id'].transform(get_std_price)
max_normalized_offers = opps.groupby(['p_id']).apply(lambda x: x['unit_price']/x['std_price'])
max_normalized_offers.index = max_normalized_offers.index.get_level_values(1)
max_normalized_offers.sort_index(inplace=True)
fig, (ax1, ax2) = plt.subplots(1, 2)
fig.set_size_inches(8, 8)
title_font = {'fontname':'Arial', 'size':'10', 'color':'black', 'weight':'normal'}
ax1.scatter(opps.country, max_normalized_offers, c=opps.status, alpha=0.1)
ax1.set_xlabel('Country', **title_font)
ax1.set_ylabel('Normalized price', **title_font)
ax2.scatter(opps.p_id, max_normalized_offers, c=opps.status, alpha=0.1)
ax2.set_xlabel('Product ID', **title_font)
N_test = int(len(opps)*0.2)
def train_test(array, N_test=N_test):
return array[N_test:,], array[:N_test,]
train_status, test_status = train_test(status)
train_product_id, test_product_id = train_test(product_id)
train_country, test_country = train_test(country)
train_p_cty, test_p_cty = train_test(p_cty)
train_normalized_offers_, test_normalized_offers_ = train_test(normalized_offers)
train_normalized_offers, test_normalized_offers = train_test(max_normalized_offers)
N=len(train_status)
dim1 = len(set(product_id))
dim2 = len(set(country))
with pm.Model() as shared_data_model:
intercept = pm.Normal('intercept', mu=0, sd=1)
alpha_product = pm.Normal('alpha_product', mu=0, sd=1, shape=dim1)
alpha_country = pm.Normal('alpha_country', mu=0, sd=1, shape=dim2)
beta_product = pm.Normal('beta_product', mu=0, sd=10, shape=dim1)
beta_country = pm.Normal('beta_country', mu=0, sd=10, shape=dim2)
train_cty = pm.Data("train_cty", train_country)
train_p = pm.Data("train_p", train_product_id)
train_offers = pm.Data("train_offers", train_normalized_offers)
train_p_cty = pm.Data("train_p_cty", train_p_cty)
p = invlogit(intercept
+ alpha_product[train_p]
+ alpha_country[train_cty]
+ beta_product[train_p] * train_offers
+ beta_country[train_cty] * train_offers
)
pm.Bernoulli('y', p=p, observed=train_status)
trace = pm.sample(init='advi+adapt_diag', n_init=50_000,
tune=1000, draws=3000, chains=3, cores=8,
target_accept=0.9, max_treedepth=10)
az.plot_trace(trace, compact=True); plt.show()
with shared_data_model:
pp_y = pm.sample_posterior_predictive(trace)["y"]
print('Accuracy:', accuracy_score(train_status, np.where(pp_y.mean(0) > 0.5, 1, 0)))
'''
plt.plot(train_normalized_offers_, train_status, "k.", alpha=0.15)
plt.scatter(train_normalized_offers_, pp_y.mean(0), s=1 )
az.plot_hdi(train_normalized_offers_, pp_y, hdi_prob=0.6, fill_kwargs={"alpha": 0.5})
az.plot_hdi(train_normalized_offers_, pp_y, fill_kwargs={"alpha": 0.2}) '''
plt.plot(train_normalized_offers, train_status, "k.", alpha=0.15)
plt.scatter(train_normalized_offers, pp_y.mean(0), s=1 )
p1 = az.plot_hdi(train_normalized_offers, pp_y, hdi_prob=0.55, smooth=True, fill_kwargs={"alpha": 0.5})
p2 = az.plot_hdi(train_normalized_offers, pp_y, hdi_prob=0.75, fill_kwargs={"alpha": 0.1})
hdi_55 = mpatches.Patch(color='orange', alpha=0.55, label='hdi: 55%')
hdi_95 = mpatches.Patch(color='orange', alpha=0.1, label='hdi: 75%')
status = mpatches.Patch(color='black', alpha=1, label='observations')
preds = mpatches.Patch(facecolor='C0', edgecolor='none', label='predictions')
plt.legend(handles=[hdi_55,hdi_95, status, preds])
plt.xlabel('Normalized price')
plt.ylabel('Probability of converted opp.')
with shared_data_model:
pm.set_data({"train_cty": test_country, "train_p": test_product_id, "train_p_cty": test_p_cty,"train_offers": test_normalized_offers})
pp_y_new = pm.sample_posterior_predictive(trace, 2000)
print('Won customers:', sum(np.where(pp_y_new['y'].mean(0) > 0.5, 1, 0)))
'''plt.plot(test_normalized_offers, test_status, "k.", alpha=0.15)
plt.scatter(test_normalized_offers, pp_y_new['y'].mean(0), s=1 )
az.plot_hdi(test_normalized_offers, pp_y_new['y'], hdi_prob=0.6, fill_kwargs={"alpha": 0.5})
az.plot_hdi(test_normalized_offers, pp_y_new['y'], fill_kwargs={"alpha": 0.2})'''
pm.trace_to_dataframe(trace).std(0).round(3)
arr = pp_y_new['y']
idx = arr.mean(0) >= 0.5
predicted_conversions = arr.mean(0)[idx]
variance_conversions = arr.std(0)[idx]
predicted_duds = arr.mean(0)[~idx]
variance_duds = arr.std(0)[~idx]
circle1 = plt.Circle((0.5, 0), 0.5, color='r', alpha=0.1)
fig, ax = plt.subplots()
plt.scatter(predicted_conversions, variance_conversions, alpha=0.5,c=test_status[idx], label='Converted')
plt.scatter(predicted_duds, variance_duds, alpha=0.5,c=test_status[~idx], label='Not converted')
ax.add_patch(circle1)
plt.ylim(0,0.55)
plt.xlabel('Conversion posterior probability')
plt.ylabel('Posterior std. dev.')
hdi_50 = mpatches.Patch(color='orange', alpha=0.5,label='converted')
hdi_95 = mpatches.Patch(color='purple', label='not converted')
plt.legend(handles=[hdi_50,hdi_95])
multiplier_lst = [i/100 for i in range(1, 100) if i%3==0]
won_opps_lst = list()
probs = list()
with tqdm(total=len(multiplier_lst)) as pbar:
for multiplier in multiplier_lst:
predictions = list()
with shared_data_model:
pm.set_data({"train_cty": test_country, "train_p": test_product_id, "train_p_cty": test_p_cty,"train_offers": test_normalized_offers*multiplier})
pp_y_new = pm.sample_posterior_predictive(trace, 200, progressbar=False)
arr = pp_y_new['y'].mean(0)[~idx]
won_opps_lst.append(sum(np.where(arr > 0.5, 1, 0)))
probs.append(sum(np.where(arr < 0.5, np.std(arr), 0))/sum(np.where(arr < 0.5, 1, 0)))
pbar.update(1)
probs = [0 if isnan(x) else x for x in probs]
bound = int(max(probs)*100)
multiplier_lst = [100 - 100*i for i in multiplier_lst]
colormap = mpl.cm.copper
norm = mpl.colors.Normalize(vmin=0, vmax=bound)
sm = plt.cm.ScalarMappable(cmap=colormap, norm=norm)
colors = [colormap(int(255/bound)*i) for i in range(bound+1)]
probs1 = [colors[int(100 * p)] for p in probs]
plt.scatter(multiplier_lst , won_opps_lst, c=probs1)
plt.ylabel('Converted opportunities')
plt.xlabel('Discount on offered unit price (%)')
plt.colorbar(sm)
multiplier_lst = [i/100 for i in range(100, 300) if i%6==0 ]
lost_opps_lst = list()
probs = list()
with tqdm(total=len(multiplier_lst)) as pbar:
for multiplier in multiplier_lst:
predictions = list()
with shared_data_model:
pm.set_data({"train_cty": test_country, "train_p": test_product_id, "train_p_cty": test_p_cty,"train_offers": test_normalized_offers*multiplier})
pp_y_new = pm.sample_posterior_predictive(trace, 200, progressbar=False)
arr = pp_y_new['y'].mean(0)[idx]
lost_opps_lst.append(sum(np.where(arr < 0.5, 1, 0)))
probs.append(sum(np.where(arr < 0.5, np.std(arr), 0))/sum(np.where(arr < 0.5, 1, 0)))
pbar.update(1)
bound = int(max(probs)*100)
colormap = mpl.cm.copper
norm = mpl.colors.Normalize(vmin=0, vmax=bound)
sm = plt.cm.ScalarMappable(cmap=colormap, norm=norm)
colors = [colormap(int(255/bound)*i) for i in range(bound+1)]
multiplier_lst = [i/100 for i in range(100, 300) if i%6==0 ]
multiplier_lst = [ 100*i for i in multiplier_lst]
probs1 = [colors[int(100 * p)] for p in probs]
plt.scatter(multiplier_lst , lost_opps_lst, c=probs1)
plt.ylabel('Lost opportunities')
plt.xlabel('Mark-up on offered unit price(%)')
plt.colorbar(sm)