/**
* Copyright (c) 2024, SWGY, Inc. <ron@sw.gy>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or (at
* your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
/*
* This program models the sequential decision problem of deciding how much
* sausage to order for a sausage-selling pizza restaurant.
*
*/
#include <assert.h>
#include <err.h>
#include <errno.h>
#include <getopt.h>
#include <limits.h>
#include <math.h>
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#ifndef __OpenBSD__
#include <time.h> /* required to seed the prng */
#endif
#include "math.h"
#include "wq.h"
#define MAX(X, Y) ((X) > (Y) ? (X) : (Y))
#define MIN(X, Y) ((X) < (Y) ? (X) : (Y))
#ifndef NUM_WORKERS
#define NUM_WORKERS 4
#endif
struct sim_params {
/* Number of time periods for which each parameter set should be
* evaluated. */
int num_time_periods;
/* The largest values theta_min and theta_max should take on during
* the simulation */
int theta_min_ceiling, theta_max_ceiling;
/* The starting value of theta min */
int theta_min_floor; /* theta_max_floor = theta_min at each iteration */
};
struct initial_state {
/* Price per sausage charged to a customer */
double p;
/* Cost to purchase a unit of sausage from our supplier */
double c;
/* Stochastic demand will be normally distributed about this mean */
double mean_demand;
/* Demand will be normally distributed using this standard deviation */
double demand_std;
/* minimum shipping fee applied to small orders */
double min_shipping;
/* orders exceeding this amount have no shipping charge applied */
double free_shipping;
};
struct dynamic_state {
int r_inv;
};
struct decision {
int order_amt;
};
struct exo_info {
int demand;
};
/*
* For the given policy parameters, `profit` was realized.
*/
struct sim_result {
double profit;
int theta_min, theta_max;
};
/*
* Each thread will be provided the following data
*/
struct thread_data {
struct sim_params p;
struct initial_state s0;
struct exo_info *W;
/* The task queue */
struct wq_head *wq;
pthread_t t;
/* OpenMPI rank - uniquely identifies this MPI task */
int rank;
};
#ifdef __OpenBSD__
__dead void
#else
__attribute__((__noreturn__)) void
#endif
print_usage(const char *n) {
printf("usage:%s [-tv] -d MEAN_DEMAND -s DEMAND_STD\n"
"\t[--num-time-periods=periods] [--purchase-cost=cost]\n"
"\t[--sale-price=price] [--theta-max-ceiling=max]\n"
"\t[--theta-min-ceiling=max]\n", n);
exit(1);
}
/* Protects the work queue */
static pthread_barrier_t wq_barrier;
static int verbose = 0;
static int terse = 0;
/**
* Given the current state, a decision, and a set of exogenous information,
* calculate the next state and place it in s_out.
*
* returns -1 on error.
*/
int transition(const struct initial_state *s0, const struct dynamic_state *s,
const struct decision *x, const struct exo_info *W,
struct dynamic_state *s_out);
/**
* Calculate the objective function value (profit in this case) given the
* provided parameters.
*/
double objective(const struct initial_state *s0, const struct dynamic_state *s,
const struct decision *x, double demand);
/**
* Apply the 'order up to' policy given the current simulation dynamic state.
*
* returns the order amount.
*/
int order_up_to(const struct dynamic_state *s, int min, int max);
/*
* Given a starting point (theta min) and simulation data, simulate
* each pair of (theta_min, theta_max) parameters from theta_max = theta_min
* through theta_max = theta_max_ceiling. Resulting objective values are
* written to `values_out`.
*/
void simulate_theta_min(int theta_min, const struct sim_params p,
const struct initial_state s0, const struct exo_info *W,
double *values_out, int rank);
/* Worker function: While task queue has contents, process those contents */
void *work_func(void *vp);
/* Given a set of sim_result instances, write the best instance to `out`.
* return non-zero on error. */
int locate_best_result(struct sim_result *results, size_t size,
struct sim_result *out);
#define SAUSAGE_MAX 1000000 /* max inventory size */
#define T_MAX 10000000 /* largest number of time periods allowed */
/* long option parsing plumbing */
#define OP_PURCH_COST 1000
#define OP_SALE_PRICE 1001
#define OP_THETA_MAX 1002
#define OP_THETA_MIN 1003
#define OP_TIME_PRD 1004
static struct option longopts[] = {
{ "demand-std-dev", required_argument, NULL, 's' },
{ "mean-demand", required_argument, NULL, 'd' },
{ "num-time-periods", required_argument, NULL, OP_TIME_PRD },
{ "purchase-cost", required_argument, NULL, OP_PURCH_COST },
{ "sale-price", required_argument, NULL, OP_SALE_PRICE },
{ "terse", no_argument, NULL, 't' },
{ "theta-max-ceiling", required_argument, NULL, OP_THETA_MAX },
{ "theta-min-ceiling", required_argument, NULL, OP_THETA_MIN },
{ "verbose", no_argument, NULL, 'v' },
{ NULL, 0, NULL, 0 }
};
int
main(int argc, char **argv)
{
struct wq_head *wq;
struct sim_params p;
struct initial_state s0;
struct exo_info *W;
int rank, size;
struct thread_data workers[NUM_WORKERS];
double mean_demand, demand_stddev;
int ch, i, j, policy_idx;
int theta_min, theta_max;
int best_min, best_max;
int policy_val_width;
double best_policy_value;
double *policy_values;
struct sim_result result;
struct sim_result *result_win_buff;
struct exo_info *w_win_buff;
MPI_Win win;
struct task *t;
p.num_time_periods = 1000;
p.theta_min_floor = 0;
p.theta_min_ceiling = -1;
p.theta_max_ceiling = -1;
mean_demand = demand_stddev = 0;
policy_val_width = 0;
/* price charged per sausage */
s0.p = 2.99;
/* cost per sausage */
s0.c = 1.83;
/* Minimum charge for shipping supplies */
s0.min_shipping = 9.99;
/* Free shipping for all orders over this price */
s0.free_shipping = 50.00;
while ((ch = getopt_long(argc, argv, "d:s:vt", longopts, NULL)) != -1) {
switch (ch) {
case 'd':
mean_demand = atof(optarg);
break;
case 's':
demand_stddev = atof(optarg);
break;
case 'v':
verbose += 1;
terse = 0;
break;
case 't':
terse = 1;
verbose = 0;
break;
case OP_PURCH_COST:
s0.c = atof(optarg);
break;
case OP_SALE_PRICE:
s0.p = atof(optarg);
break;
case OP_THETA_MIN:
errno = 0;
#ifdef __OpenBSD__
if ((p.theta_min_ceiling = strtonum(optarg, 0,
SAUSAGE_MAX, NULL)) == 0 && errno)
err(1, "strtonum theta-min-ceil");
#else
p.theta_min_ceiling = atoll(optarg);
#endif
break;
case OP_THETA_MAX:
errno = 0;
#ifdef __OpenBSD__
if ((p.theta_max_ceiling = strtonum(optarg, 0,
SAUSAGE_MAX, NULL)) == 0 && errno)
err(1, "strtonum theta-max-ceil");
#else
p.theta_max_ceiling = atoll(optarg);
#endif
break;
case OP_TIME_PRD:
errno = 0;
#ifdef __OpenBSD__
if ((p.num_time_periods = strtonum(optarg, 1, T_MAX,
NULL)) == 0 && errno)
err(1, "strtonum num-time-periods");
#else
p.num_time_periods = atoll(optarg);
#endif
break;
default:
print_usage(*argv); /* does not return */
}
}
if (mean_demand == 0 || demand_stddev == 0)
print_usage(*argv); /* does not return */
if (setvbuf(stdout, NULL, _IONBF, 0) != 0)
err(1, "failed to set stdout to unbuffered");
#ifndef __OpenBSD__
srand(time(NULL));
#endif
/* If policy params were not set, assign values based on a heuristic */
if (p.theta_min_ceiling == -1)
p.theta_min_ceiling = mean_demand * 2;
if (p.theta_max_ceiling == -1)
p.theta_max_ceiling = MAX(mean_demand * 2, p.theta_min_ceiling);
if (p.theta_max_ceiling < p.theta_min_ceiling) {
fprintf(stderr, "theta-max-ceiling (%d) must be greater than or"
" equal to theta-min-ceiling (%d).\n", p.theta_max_ceiling,
p.theta_min_ceiling);
print_usage(*argv); /* does not return */
}
/* MPI Initialization */
if (MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &ch) != 0 ||
ch < MPI_THREAD_FUNNELED)
fprintf(stderr, "MPI_Init_thread");
if (MPI_Comm_size(MPI_COMM_WORLD, &size) != 0)
fprintf(stderr, "MPI_Comm_size");
if (MPI_Comm_rank(MPI_COMM_WORLD, &rank) != 0)
fprintf(stderr, "MPI_Comm_rank");
if (verbose)
fprintf(stderr, "[MPI] Process %d of %d.\n", rank, size);
/* parameter initialization */
wq = wq_init();
s0.mean_demand = mean_demand;
s0.demand_std = demand_stddev;
/* This window will first be used to communicate the single set of
* stochastic demand data. */
MPI_Win_allocate(sizeof(struct exo_info) * p.num_time_periods,
sizeof(struct exo_info), MPI_INFO_NULL, MPI_COMM_WORLD,
&w_win_buff, &win);
W = calloc(p.num_time_periods, sizeof(struct exo_info));
policy_values = calloc(p.theta_min_ceiling * p.theta_max_ceiling,
sizeof(double));
/* Store the "width" of this matrix before theta_max_ceiling is divided
* based on rank */
policy_val_width = p.theta_max_ceiling;
/* Generate the stochastic scenario data */
for (i = 0; i < p.num_time_periods; ++i) {
W[i].demand = (unsigned long) MAX(0,
normal(s0.mean_demand, s0.demand_std));
}
/* The first read epoch, synchronize */
MPI_Win_fence(0, win);
if (rank == 0) /* local access */
memcpy(w_win_buff, W, sizeof(struct exo_info) *
p.num_time_periods);
/* Divide the parameter search space based on rank */
p.theta_min_floor = rank * p.theta_min_ceiling / size;
p.theta_min_ceiling = (rank + 1) * p.theta_min_ceiling / size >
p.theta_min_ceiling ? p.theta_min_ceiling : (rank + 1) *
p.theta_min_ceiling / size;
if (verbose)
fprintf(stderr, "[MPI #%d] floor: %d, ceiling: %d\n", rank,
p.theta_min_floor, p.theta_min_ceiling);
/* Exogenous data copied to the window, synchronize prior to the
* other workers reading that data */
MPI_Win_fence(0, win);
if (rank != 0) { /* Workers must receive scenario data */
/* Read in W */
MPI_Get(W,
p.num_time_periods * sizeof(struct exo_info) / sizeof(int),
MPI_INT, 0, 0,
p.num_time_periods * sizeof(struct exo_info) / sizeof(int),
MPI_INT, win);
}
/* RMA complete, sync */
MPI_Win_fence(0, win);
if (MPI_Win_free(&win) != 0)
err(1, "MPI_Win_free");
/* At this point, s0, W, and sim_params will no longer change */
/* Load up the tasks */
if (verbose)
fprintf(stderr, "Creating tasks.\n");
for (i = p.theta_min_floor; i < p.theta_min_ceiling; ++i) {
t = calloc(1, sizeof(struct task));
t->theta_min = i;
t->v_out = policy_values + i * policy_val_width;
if (wq_q(wq, t) != 0)
err(1, "Task queue failed");
}
/* Create NUM_WORKERS workers */
if (verbose)
fprintf(stderr, "[MPI %d] Spawning %d workers\n", rank,
NUM_WORKERS);
if (pthread_barrier_init(&wq_barrier, NULL, NUM_WORKERS) != 0)
err(1, "pthread_barrier_init");
for (i = 0; i < NUM_WORKERS; i++) {
/* Each worker needs sim params, s0, W, and a pointer to wq */
memcpy(&workers[i].p, &p, sizeof(struct sim_params));
memcpy(&workers[i].s0, &s0, sizeof(struct initial_state));
workers[i].W = calloc(p.num_time_periods,
sizeof(struct exo_info));
memcpy(workers[i].W, W, p.num_time_periods *
sizeof(struct exo_info));
workers[i].wq = wq;
workers[i].rank = rank;
if (pthread_create(&workers[i].t, NULL, work_func, &workers[i])
!= 0)
err(1, "pthread_create");
}
/* wait until all workers finish */
for (i = 0; i < NUM_WORKERS; ++i)
if (pthread_join(workers[i].t, NULL) != 0)
err(1, "pthread join %d", i);
/* Now find the policy parameters that performed the best */
best_policy_value = 0;
best_min = best_max = 0;
for (i = p.theta_min_floor; i < p.theta_min_ceiling; ++i) {
for (j = i; j < policy_val_width; ++j) {
theta_min = i;
theta_max = j;
policy_idx = theta_min * policy_val_width + theta_max;
if (policy_values[policy_idx] > best_policy_value) {
best_policy_value = policy_values[policy_idx];
best_min = theta_min;
best_max = theta_max;
/* ensure min <= max */
best_min = MIN(best_min, best_max);
}
}
}
result.profit = best_policy_value;
result.theta_min = best_min;
result.theta_max = best_max;
if (verbose)
fprintf(stderr, "[MPI #%d] profit: %f\n", rank, result.profit);
/* Push results back to rank 0 in this window */
MPI_Win_allocate(sizeof(struct sim_result) * size,
sizeof(struct sim_result), MPI_INFO_NULL, MPI_COMM_WORLD,
&result_win_buff, &win);
/* Sync before the results are written */
MPI_Win_fence(0, win);
MPI_Put(&result, sizeof(struct sim_result) / sizeof(int),
MPI_INT, 0, rank, sizeof(struct sim_result) / sizeof(int),
MPI_INT, win);
/* Write has finished, sync again */
MPI_Win_fence(0, win);
/* Rank zero should locate the best result set */
if (rank == 0 && locate_best_result(result_win_buff, size,
&result) != 0)
err(1, "locate_best_result");
/* Print the simulation results */
if (rank == 0 && !terse)
printf("[MPI #%d] With avg demand of %.0f, order-up-to(min: %d,"
" max %d) performs the best\nYielding %.2f profit on"
" average over %d time periods.\n", rank, s0.mean_demand,
result.theta_min,
result.theta_max,
result.profit,
p.num_time_periods);
else if (rank == 0)
printf("%d, %d, %d\n", (int)s0.mean_demand,
result.theta_min, result.theta_max);
/* Cleanup */
for (i = 0; i < NUM_WORKERS; i++)
free(workers[i].W);
pthread_barrier_destroy(&wq_barrier);
wq_free(wq);
MPI_Win_free(&win);
MPI_Finalize(); /* Nothing to do really if it errors */
free(policy_values);
free(W);
exit(0);
}
int
transition(__attribute__((unused)) const struct initial_state * s0,
const struct dynamic_state *s, const struct decision *x,
const struct exo_info *W, struct dynamic_state *s_out)
{
/* Update inventory to reflect demand */
s_out->r_inv = MAX(0, s->r_inv + x->order_amt - W->demand);
assert(s_out->r_inv >= 0);
assert(s->r_inv >= 0);
return 0;
}
double
objective(const struct initial_state *s0, const struct dynamic_state *s,
const struct decision *x, double demand)
{
double result;
result = -s0->c * x->order_amt + s0->p *
MIN(s->r_inv + x->order_amt, demand);
if (x->order_amt == 0 || x->order_amt * s0->c > s0->free_shipping)
return result;
else
return result - s0->min_shipping;
}
int
order_up_to(const struct dynamic_state *s, int min, int max)
{
if (s->r_inv < min)
return max - s->r_inv;
else
return 0;
}
void
simulate_theta_min(int theta_min, const struct sim_params p,
const struct initial_state s0,
const struct exo_info *W, double *v_out, int rank)
{
struct dynamic_state states[2];
struct decision x;
int i, t;
for (i = theta_min; i < p.theta_max_ceiling; ++i) {
/* start with the average demand in inventory */
states[0].r_inv = s0.mean_demand;
states[1].r_inv = s0.mean_demand;
v_out[i] = 0;
for (t = 0; t < p.num_time_periods; ++t) {
assert(states[t%2].r_inv >= 0);
x.order_amt = order_up_to(&states[t%2], theta_min, i);
v_out[i] += objective(&s0, &states[t%2], &x,
W[t].demand);
transition(&s0, &states[t%2], &x, &W[t],
&states[(t+1)%2]);
}
if (verbose > 1)
fprintf(stderr, "[MPI #%d] v_out[i] sum = %f\n", rank,
v_out[i]);
v_out[i] = v_out[i] / p.num_time_periods;
/* TODO: make this output safe for multithreaded exec */
if (verbose > 1)
fprintf(stderr, "[MPI #%d] min %d and max %d yielded "
"value %.2f.\n", rank, theta_min, i,
v_out[i]);
}
}
void *
work_func(void *vp)
{
struct thread_data *td = vp;
struct task *t;
int r;
r = pthread_barrier_wait(&wq_barrier);
if (r != PTHREAD_BARRIER_SERIAL_THREAD && r != 0)
err(1, "pthread_barrier_wait");
while ((t = wq_pop(td->wq)) != NULL) {
simulate_theta_min(t->theta_min, td->p, td->s0, td->W,
t->v_out, td->rank);
free(t);
}
return NULL;
}
int
locate_best_result(struct sim_result *results, size_t size,
struct sim_result *out)
{
size_t i;
out->profit = 0;
out->theta_min = 0;
out->theta_max = 0;
for (i = 0; i < size; ++i) {
if (results[i].profit > out->profit) {
out->profit = results[i].profit;
out->theta_min = results[i].theta_min;
out->theta_max = results[i].theta_max;
}
}
return 0;
}