Inventory Reshuffle

mail@pastecode.io avatar
unknown
ruby
24 days ago
13 kB
8
Indexable
Never
require '/usr/share/fk-sc-warehouse-b2b/config/boot'
require '/usr/share/fk-sc-warehouse-b2b/app/LA/helpers/inventory_swap_helper'
require 'binaryheap'
require '/usr/share/fk-sc-warehouse-b2b/scripts/sc_metrics_publisher'
require 'digest'
require_relative 'inventory_reshuffle_email_publisher'


class InventoryReshuffleV2

  CUSTOMER_OUTBOUND = "customer_outbound"
  IWIT = "iwit"
  CUTOFFS_KEY = "candidate_no_of_cutoff_for_swap"
  TIME_CUTOFF_KEY = "candidate_time_for_swap"

  def self.reshuffle(warehouse_id, config, cron_id)
    ShardManager.new.using_shard_for_warehouse_id(warehouse_id) do
      inventory_view_service = BusinessServiceFactory.get_inventory_bulk_location_view_service(warehouse_id)

      cob_dbd_wids_map = {}
      iwit_dbd_wids_map = {}

      if config.has_key? CUSTOMER_OUTBOUND
        cutoffs = config[CUSTOMER_OUTBOUND][CUTOFFS_KEY]
        ahead_time = config[CUSTOMER_OUTBOUND][TIME_CUTOFF_KEY]
        dbd_to_bulk_res_count_array = inventory_view_service.get_dispatch_by_dates_for_bulk_reservations(CUSTOMER_OUTBOUND)
        dbd_to_bulk_res_count_array = dbd_to_bulk_res_count_array.shift(cutoffs)

        dbd_to_bulk_res_count_array.each do |dbd_to_bulk_res_count|
          dispatch_by_date = dbd_to_bulk_res_count[:dispatch_by_date]
          wid_reservations_map = inventory_view_service.get_reservation_id_and_wid_for_bulk_location_inventory(CUSTOMER_OUTBOUND,dispatch_by_date)
                                     .group_by{ |res| res[:wid]}
          wid_reservations_map = wid_reservations_map.map do |wid, reservations|
            [wid, {
                :reservation_type => CUSTOMER_OUTBOUND,
                :dispatch_by_date => dispatch_by_date,
                :wid => wid,
                :reservation_ids => reservations.collect{ |res| res[:id]},
                :reservation_count => reservations.count
            }]
          end.to_h
          cob_dbd_wids_map[dispatch_by_date] = wid_reservations_map
        end

      end

      if config.has_key? IWIT
        cutoffs = config[CUSTOMER_OUTBOUND][CUTOFFS_KEY]
        ahead_time = config[CUSTOMER_OUTBOUND][TIME_CUTOFF_KEY]
        dbd_to_bulk_res_count_array = inventory_view_service.get_dispatch_by_dates_for_bulk_reservations(IWIT)
        dbd_to_bulk_res_count_array = dbd_to_bulk_res_count_array.shift(cutoffs)

        dbd_to_bulk_res_count_array.each do |dbd_to_bulk_res_count|
          dispatch_by_date = dbd_to_bulk_res_count[:dispatch_by_date]
          wid_reservations_map = inventory_view_service.get_reservation_id_and_wid_for_bulk_location_inventory(IWIT, dispatch_by_date)
                                     .group_by{ |res| res[:wid]}
          wid_reservations_map = wid_reservations_map.map do |wid, reservations|
            [wid, {
                :reservation_type => IWIT,
                :dispatch_by_date => dispatch_by_date,
                :wid => wid,
                :reservation_ids => reservations.collect{ |res| res[:id]},
                :reservation_count => reservations.count
            }]
          end.to_h
          iwit_dbd_wids_map[dispatch_by_date] = wid_reservations_map
        end
      end

      priority_array = get_priority(cob_dbd_wids_map.keys ,iwit_dbd_wids_map.keys)
      wid_hash = {}
      priority_array.each do |priority_hash|
        wid_res_map = {}
        case priority_hash[:reservation_type]
          when IWIT
            wid_res_map = iwit_dbd_wids_map[priority_hash[:dbd]]
          when CUSTOMER_OUTBOUND
            wid_res_map = cob_dbd_wids_map[priority_hash[:dbd]]
          else
            logger.error("Not a valid reservation type")
        end
        wid_res_map.each do |wid, res|
          res[:priority] = priority_hash[:priority]
          if wid_hash.has_key?(wid)
            wid_hash[wid]["aggregated_reservations"] << res
          else
            wid_hash[wid] = {}
            wid_hash[wid]["aggregated_reservations"] = [res]
          end
        end
      end

      wid_hash.each do |wid , value|
        value["total_count"] = value["aggregated_reservations"].collect{ |res| res[:reservation_count]}.reduce(0, :+)
      end


      bucket_count = get_bucket_count(wid_hash, config)
      buckets = if config.has_key?('use_hashing') and config['use_hashing']
                  get_buckets_by_hash(wid_hash, config['max_parallelism'], warehouse_id)
                else
                  get_wids_filled_buckets(wid_hash, bucket_count)
                end
      buckets.each{ |bucket| publish(wid_hash, bucket, config["reservation_batch_size"], warehouse_id, cron_id)}
    end
  end

  def self.get_wids_filled_buckets(wid_hash, bucket_count)
    wid_hash = wid_hash.sort_by {|k, v| -v["total_count"]}
    min_heap = BinaryHeap.new{|parent, child| child <=> parent}
    (1..bucket_count).each{ |i|  min_heap.insert(Bucket.new('bucket_'+i.to_s))}
    wid_hash.each do |wid, value|
      bucket = min_heap.eject
      bucket.rank += value["total_count"]
      bucket.items << wid
      min_heap.insert bucket
    end
    puts min_heap.data.each{ |bucket| bucket.print}
    min_heap.data
  end

  def self.get_buckets_by_hash(wid_hash, bucket_count, warehouse_id)
    bucket_hash = Hash.new
    total = 0
    max = 0
    wid_hash.each do |wid, value|
      index = Digest::MD5.hexdigest(wid)[0, 16].to_i(16) % bucket_count
      bucket = if bucket_hash.has_key? index
                 bucket_hash[index]
               else
                 b = Bucket.new('bucket_'+ index.to_s)
                 bucket_hash[index] = b
                 b
               end
      bucket.items << wid
      bucket.rank += value["total_count"]
      max = bucket.rank > max ? bucket.rank : max
      total += value["total_count"]
    end
    bucket_hash.values.each do |bucket|
      ScMetrics.count('INV_SWAP_COUNT_'+ warehouse_id, (bucket.rank))
    end
    begin
      ScMetrics.count('INV_SWAP_SKEW_'+ warehouse_id, (max)/(total/bucket_hash.keys.size))
    rescue Exception => e
    end
    bucket_hash.values
  end

  def self.get_bucket_count(wid_hash,config)
    wid_count = wid_hash.keys.count
    total_reservations  = wid_hash.collect{ |k,v| v["total_count"]}.reduce(0, :+)
    batch_size = config["reservation_batch_size"]
    total_batch = (total_reservations.to_f/batch_size).ceil
    bucket_count = total_batch > config['max_parallelism'] ? config['max_parallelism'] : total_batch
    bucket_count = bucket_count > wid_count ? wid_count : bucket_count
    bucket_count
  end

  def self.get_priority(cob_dbds ,iwit_dbds)
    priority_array = []
    cob_dbds = cob_dbds.sort
    iwit_dbds = iwit_dbds.sort
    cob_dbd = cob_dbds.shift(1).first
    iwit_dbd = iwit_dbds.shift(1).first
    priority=1
    while true
      if cob_dbd!=nil && iwit_dbd!=nil
        if cob_dbd > iwit_dbd
          priority_array << {
              :priority => priority,
              :dbd => iwit_dbd,
              :reservation_type => IWIT
          }
          priority +=1
          iwit_dbd = iwit_dbds.shift(1).first
        else
          priority_array << {
              :priority => priority,
              :dbd => cob_dbd,
              :reservation_type => CUSTOMER_OUTBOUND
          }
          priority +=1
          cob_dbd = cob_dbds.shift(1).first
        end
      elsif cob_dbd!=nil && iwit_dbd==nil
        priority_array << {
            :priority => priority,
            :dbd => cob_dbd,
            :reservation_type => CUSTOMER_OUTBOUND
        }
        priority +=1
        cob_dbd = cob_dbds.shift(1).first
      elsif cob_dbd==nil && iwit_dbd!=nil
        priority_array << {
            :priority => priority,
            :dbd => iwit_dbd,
            :reservation_type => IWIT
        }
        priority +=1
        iwit_dbd = iwit_dbds.shift(1).first
      elsif cob_dbd==nil && iwit_dbd==nil
        break
      end
    end
    priority_array
  end

  def self.publish(wid_hash, bucket, batch_threshold, warehouse_id, cron_id)
    aggregated_reservations_hash_array = bucket.items.collect{ |item| {item => wid_hash[item]}}

    message_publisher = MessagePublisherFactory.message_publisher(SERVICE_NAME, 'wms_inventory_swap')
    batch_reservation_count=0
    payload = { "reservations" => {}, "cron_id" => cron_id}
    group_id = warehouse_id + bucket.name
    url = UrlBuilderLib::UrlBuilder.base_url(SupplyChain.warehouse_vip1).add_path('warehouses')
              .add_path(warehouse_id).add_path('inventory').add_path('reshuffle').add_path('v2').build

    while true
      aggregated_reservations_hash = aggregated_reservations_hash_array.shift(1).first
      break if aggregated_reservations_hash == nil

      wid , aggregated_reservations = aggregated_reservations_hash.shift
      payload["reservations"].merge!({ wid => []})
      aggregated_reservations["aggregated_reservations"].sort_by { |i| i["priority"]}.each do |reservations|
        is_reservation_available = false
        while true
          if batch_reservation_count <= batch_threshold
            if batch_reservation_count + reservations[:reservation_count] > batch_threshold
              batch_pending_count = batch_threshold - batch_reservation_count
              remaining_reservation_count = reservations[:reservation_count] - batch_pending_count
              batch_pending_reservations = reservations.merge(:reservation_ids => reservations[:reservation_ids].shift(batch_pending_count))
              batch_pending_reservations.merge!({:reservation_count => batch_pending_count})
              reservations.merge!({:reservation_count => remaining_reservation_count})
              payload["reservations"][wid] << batch_pending_reservations
              batch_reservation_count = batch_threshold
              is_reservation_available= true
            else
              payload["reservations"][wid] << reservations
              batch_reservation_count += reservations[:reservation_count]
              is_reservation_available= false
            end
          end

          if batch_reservation_count >= batch_threshold
            message_publisher.post(url, payload.to_json, group_id)
            batch_reservation_count=0
            payload = { "reservations" => {}, "cron_id" => cron_id}
            payload["reservations"].merge!({ wid => []})
            if is_reservation_available
              payload["reservations"].merge!({ wid => []})
            else
              break
            end
          end

          unless is_reservation_available
            break
          end

        end
      end

    end

    unless payload["reservations"].empty?
      message_publisher.post(url, payload.to_json, group_id)
    end

  end

end

def publish_metrics(metric_name, count,file)
  for a in 1..10 do
    t = Time.now.to_i
    file.write("#{t} #{metric_name}.count #{count} \n")
  end
end

class Bucket
  include Comparable
  attr_accessor :name, :items, :rank

  def initialize(name)
    @name, @rank = name, 0
    @items = []
  end

  def <=>(other_bucket)
    rank <=> other_bucket.rank
  end

  def print
    "Bucket : #{name} , Items: #{items}, Rank: #{rank}"
  end
end
def inventory_cron
  report_start_time = Time.now
  reshuffle_config = WarehouseService.settings.inventory_reshuffle_config_v2
  reshuffle_config_v1 = WarehouseService.settings.inventory_reshuffle_config
  cron_id = Time.now.to_i.to_s + '_inventory_swap'
  success_count = 0
  failure_count = 0
  reshuffle_config.keys.each do |warehouse_id|
    next if reshuffle_config_v1.has_key?(warehouse_id)
    begin
      logger.info "Inventory_reshuffle_v2 job started at #{Time.now}"
      InventoryReshuffleV2.reshuffle(warehouse_id, reshuffle_config[warehouse_id], cron_id)
      ScMetrics.increment('inventory_reshuffle_v2.success')
      success_count += 1
    rescue Exception => e
      logger.info "INVENTORT_RESHUFFLE_V2_EXCEPTION_#{Time.now}"
      logger.info e.message
      logger.info e.backtrace
      # File.open("/var/log/flipkart/supply-chain/fk-sc-warehouse-b2b/INVENTORT_RESHUFFLE_EXCEPTION_V2_#{cron_id}", 'a') { |file| file.write("#{warehouse_id} :=> #{e.message} \n #{e.backtrace} \n\n") }
      # message_subject = "Inventory Reshuffle V2 Cron Failed!!! for #{warehouse_id}"
      # message_body =  "Cron Failed, \n exception is \n #{e.backtrace} "
      # EmailPublisher.send_mail(message_subject,message_body)
      ScMetrics.increment('inventory_reshuffle_v2.failure')
      failure_count += 1
    ensure
      report_end_time = Time.now
      report_execution_time = (report_end_time - report_start_time)* 1000
      ScMetrics.timing('inventory_reshuffle.time', report_execution_time)
    end
  end
end

def run_inventory_reshuffle_cron
  while true
    inventory_cron
    sleep 1800
  end
end
# File.open("/var/log/flipkart/supply-chain/fk-sc-warehouse-b2b/inventory_reshuffle_v2_metric_file.log",'w') do |file|
#   publish_metrics('inventory_reshuffle_v2.success',success_count,file)
#   publish_metrics('inventory_reshuffle_v2.failure',failure_count,file)
# end
Leave a Comment