Inventory Reshuffle
require '/usr/share/fk-sc-warehouse-b2b/config/boot' require '/usr/share/fk-sc-warehouse-b2b/app/LA/helpers/inventory_swap_helper' require 'binaryheap' require '/usr/share/fk-sc-warehouse-b2b/scripts/sc_metrics_publisher' require 'digest' require_relative 'inventory_reshuffle_email_publisher' class InventoryReshuffleV2 CUSTOMER_OUTBOUND = "customer_outbound" IWIT = "iwit" CUTOFFS_KEY = "candidate_no_of_cutoff_for_swap" TIME_CUTOFF_KEY = "candidate_time_for_swap" def self.reshuffle(warehouse_id, config, cron_id) ShardManager.new.using_shard_for_warehouse_id(warehouse_id) do inventory_view_service = BusinessServiceFactory.get_inventory_bulk_location_view_service(warehouse_id) cob_dbd_wids_map = {} iwit_dbd_wids_map = {} if config.has_key? CUSTOMER_OUTBOUND cutoffs = config[CUSTOMER_OUTBOUND][CUTOFFS_KEY] ahead_time = config[CUSTOMER_OUTBOUND][TIME_CUTOFF_KEY] dbd_to_bulk_res_count_array = inventory_view_service.get_dispatch_by_dates_for_bulk_reservations(CUSTOMER_OUTBOUND) dbd_to_bulk_res_count_array = dbd_to_bulk_res_count_array.shift(cutoffs) dbd_to_bulk_res_count_array.each do |dbd_to_bulk_res_count| dispatch_by_date = dbd_to_bulk_res_count[:dispatch_by_date] wid_reservations_map = inventory_view_service.get_reservation_id_and_wid_for_bulk_location_inventory(CUSTOMER_OUTBOUND,dispatch_by_date) .group_by{ |res| res[:wid]} wid_reservations_map = wid_reservations_map.map do |wid, reservations| [wid, { :reservation_type => CUSTOMER_OUTBOUND, :dispatch_by_date => dispatch_by_date, :wid => wid, :reservation_ids => reservations.collect{ |res| res[:id]}, :reservation_count => reservations.count }] end.to_h cob_dbd_wids_map[dispatch_by_date] = wid_reservations_map end end if config.has_key? IWIT cutoffs = config[CUSTOMER_OUTBOUND][CUTOFFS_KEY] ahead_time = config[CUSTOMER_OUTBOUND][TIME_CUTOFF_KEY] dbd_to_bulk_res_count_array = inventory_view_service.get_dispatch_by_dates_for_bulk_reservations(IWIT) dbd_to_bulk_res_count_array = dbd_to_bulk_res_count_array.shift(cutoffs) dbd_to_bulk_res_count_array.each do |dbd_to_bulk_res_count| dispatch_by_date = dbd_to_bulk_res_count[:dispatch_by_date] wid_reservations_map = inventory_view_service.get_reservation_id_and_wid_for_bulk_location_inventory(IWIT, dispatch_by_date) .group_by{ |res| res[:wid]} wid_reservations_map = wid_reservations_map.map do |wid, reservations| [wid, { :reservation_type => IWIT, :dispatch_by_date => dispatch_by_date, :wid => wid, :reservation_ids => reservations.collect{ |res| res[:id]}, :reservation_count => reservations.count }] end.to_h iwit_dbd_wids_map[dispatch_by_date] = wid_reservations_map end end priority_array = get_priority(cob_dbd_wids_map.keys ,iwit_dbd_wids_map.keys) wid_hash = {} priority_array.each do |priority_hash| wid_res_map = {} case priority_hash[:reservation_type] when IWIT wid_res_map = iwit_dbd_wids_map[priority_hash[:dbd]] when CUSTOMER_OUTBOUND wid_res_map = cob_dbd_wids_map[priority_hash[:dbd]] else logger.error("Not a valid reservation type") end wid_res_map.each do |wid, res| res[:priority] = priority_hash[:priority] if wid_hash.has_key?(wid) wid_hash[wid]["aggregated_reservations"] << res else wid_hash[wid] = {} wid_hash[wid]["aggregated_reservations"] = [res] end end end wid_hash.each do |wid , value| value["total_count"] = value["aggregated_reservations"].collect{ |res| res[:reservation_count]}.reduce(0, :+) end bucket_count = get_bucket_count(wid_hash, config) buckets = if config.has_key?('use_hashing') and config['use_hashing'] get_buckets_by_hash(wid_hash, config['max_parallelism'], warehouse_id) else get_wids_filled_buckets(wid_hash, bucket_count) end buckets.each{ |bucket| publish(wid_hash, bucket, config["reservation_batch_size"], warehouse_id, cron_id)} end end def self.get_wids_filled_buckets(wid_hash, bucket_count) wid_hash = wid_hash.sort_by {|k, v| -v["total_count"]} min_heap = BinaryHeap.new{|parent, child| child <=> parent} (1..bucket_count).each{ |i| min_heap.insert(Bucket.new('bucket_'+i.to_s))} wid_hash.each do |wid, value| bucket = min_heap.eject bucket.rank += value["total_count"] bucket.items << wid min_heap.insert bucket end puts min_heap.data.each{ |bucket| bucket.print} min_heap.data end def self.get_buckets_by_hash(wid_hash, bucket_count, warehouse_id) bucket_hash = Hash.new total = 0 max = 0 wid_hash.each do |wid, value| index = Digest::MD5.hexdigest(wid)[0, 16].to_i(16) % bucket_count bucket = if bucket_hash.has_key? index bucket_hash[index] else b = Bucket.new('bucket_'+ index.to_s) bucket_hash[index] = b b end bucket.items << wid bucket.rank += value["total_count"] max = bucket.rank > max ? bucket.rank : max total += value["total_count"] end bucket_hash.values.each do |bucket| ScMetrics.count('INV_SWAP_COUNT_'+ warehouse_id, (bucket.rank)) end begin ScMetrics.count('INV_SWAP_SKEW_'+ warehouse_id, (max)/(total/bucket_hash.keys.size)) rescue Exception => e end bucket_hash.values end def self.get_bucket_count(wid_hash,config) wid_count = wid_hash.keys.count total_reservations = wid_hash.collect{ |k,v| v["total_count"]}.reduce(0, :+) batch_size = config["reservation_batch_size"] total_batch = (total_reservations.to_f/batch_size).ceil bucket_count = total_batch > config['max_parallelism'] ? config['max_parallelism'] : total_batch bucket_count = bucket_count > wid_count ? wid_count : bucket_count bucket_count end def self.get_priority(cob_dbds ,iwit_dbds) priority_array = [] cob_dbds = cob_dbds.sort iwit_dbds = iwit_dbds.sort cob_dbd = cob_dbds.shift(1).first iwit_dbd = iwit_dbds.shift(1).first priority=1 while true if cob_dbd!=nil && iwit_dbd!=nil if cob_dbd > iwit_dbd priority_array << { :priority => priority, :dbd => iwit_dbd, :reservation_type => IWIT } priority +=1 iwit_dbd = iwit_dbds.shift(1).first else priority_array << { :priority => priority, :dbd => cob_dbd, :reservation_type => CUSTOMER_OUTBOUND } priority +=1 cob_dbd = cob_dbds.shift(1).first end elsif cob_dbd!=nil && iwit_dbd==nil priority_array << { :priority => priority, :dbd => cob_dbd, :reservation_type => CUSTOMER_OUTBOUND } priority +=1 cob_dbd = cob_dbds.shift(1).first elsif cob_dbd==nil && iwit_dbd!=nil priority_array << { :priority => priority, :dbd => iwit_dbd, :reservation_type => IWIT } priority +=1 iwit_dbd = iwit_dbds.shift(1).first elsif cob_dbd==nil && iwit_dbd==nil break end end priority_array end def self.publish(wid_hash, bucket, batch_threshold, warehouse_id, cron_id) aggregated_reservations_hash_array = bucket.items.collect{ |item| {item => wid_hash[item]}} message_publisher = MessagePublisherFactory.message_publisher(SERVICE_NAME, 'wms_inventory_swap') batch_reservation_count=0 payload = { "reservations" => {}, "cron_id" => cron_id} group_id = warehouse_id + bucket.name url = UrlBuilderLib::UrlBuilder.base_url(SupplyChain.warehouse_vip1).add_path('warehouses') .add_path(warehouse_id).add_path('inventory').add_path('reshuffle').add_path('v2').build while true aggregated_reservations_hash = aggregated_reservations_hash_array.shift(1).first break if aggregated_reservations_hash == nil wid , aggregated_reservations = aggregated_reservations_hash.shift payload["reservations"].merge!({ wid => []}) aggregated_reservations["aggregated_reservations"].sort_by { |i| i["priority"]}.each do |reservations| is_reservation_available = false while true if batch_reservation_count <= batch_threshold if batch_reservation_count + reservations[:reservation_count] > batch_threshold batch_pending_count = batch_threshold - batch_reservation_count remaining_reservation_count = reservations[:reservation_count] - batch_pending_count batch_pending_reservations = reservations.merge(:reservation_ids => reservations[:reservation_ids].shift(batch_pending_count)) batch_pending_reservations.merge!({:reservation_count => batch_pending_count}) reservations.merge!({:reservation_count => remaining_reservation_count}) payload["reservations"][wid] << batch_pending_reservations batch_reservation_count = batch_threshold is_reservation_available= true else payload["reservations"][wid] << reservations batch_reservation_count += reservations[:reservation_count] is_reservation_available= false end end if batch_reservation_count >= batch_threshold message_publisher.post(url, payload.to_json, group_id) batch_reservation_count=0 payload = { "reservations" => {}, "cron_id" => cron_id} payload["reservations"].merge!({ wid => []}) if is_reservation_available payload["reservations"].merge!({ wid => []}) else break end end unless is_reservation_available break end end end end unless payload["reservations"].empty? message_publisher.post(url, payload.to_json, group_id) end end end def publish_metrics(metric_name, count,file) for a in 1..10 do t = Time.now.to_i file.write("#{t} #{metric_name}.count #{count} \n") end end class Bucket include Comparable attr_accessor :name, :items, :rank def initialize(name) @name, @rank = name, 0 @items = [] end def <=>(other_bucket) rank <=> other_bucket.rank end def print "Bucket : #{name} , Items: #{items}, Rank: #{rank}" end end def inventory_cron report_start_time = Time.now reshuffle_config = WarehouseService.settings.inventory_reshuffle_config_v2 reshuffle_config_v1 = WarehouseService.settings.inventory_reshuffle_config cron_id = Time.now.to_i.to_s + '_inventory_swap' success_count = 0 failure_count = 0 reshuffle_config.keys.each do |warehouse_id| next if reshuffle_config_v1.has_key?(warehouse_id) begin logger.info "Inventory_reshuffle_v2 job started at #{Time.now}" InventoryReshuffleV2.reshuffle(warehouse_id, reshuffle_config[warehouse_id], cron_id) ScMetrics.increment('inventory_reshuffle_v2.success') success_count += 1 rescue Exception => e logger.info "INVENTORT_RESHUFFLE_V2_EXCEPTION_#{Time.now}" logger.info e.message logger.info e.backtrace # File.open("/var/log/flipkart/supply-chain/fk-sc-warehouse-b2b/INVENTORT_RESHUFFLE_EXCEPTION_V2_#{cron_id}", 'a') { |file| file.write("#{warehouse_id} :=> #{e.message} \n #{e.backtrace} \n\n") } # message_subject = "Inventory Reshuffle V2 Cron Failed!!! for #{warehouse_id}" # message_body = "Cron Failed, \n exception is \n #{e.backtrace} " # EmailPublisher.send_mail(message_subject,message_body) ScMetrics.increment('inventory_reshuffle_v2.failure') failure_count += 1 ensure report_end_time = Time.now report_execution_time = (report_end_time - report_start_time)* 1000 ScMetrics.timing('inventory_reshuffle.time', report_execution_time) end end end def run_inventory_reshuffle_cron while true inventory_cron sleep 1800 end end # File.open("/var/log/flipkart/supply-chain/fk-sc-warehouse-b2b/inventory_reshuffle_v2_metric_file.log",'w') do |file| # publish_metrics('inventory_reshuffle_v2.success',success_count,file) # publish_metrics('inventory_reshuffle_v2.failure',failure_count,file) # end
Leave a Comment