Untitled
unknown
ruby
a year ago
13 kB
17
Indexable
require '/usr/share/fk-sc-warehouse-b2b/config/boot'
require '/usr/share/fk-sc-warehouse-b2b/app/LA/helpers/inventory_swap_helper'
require 'binaryheap'
require '/usr/share/fk-sc-warehouse-b2b/scripts/sc_metrics_publisher'
require 'digest'
require_relative 'inventory_reshuffle_email_publisher'
class InventoryReshuffleV2
CUSTOMER_OUTBOUND = "customer_outbound"
IWIT = "iwit"
CUTOFFS_KEY = "candidate_no_of_cutoff_for_swap"
TIME_CUTOFF_KEY = "candidate_time_for_swap"
def self.reshuffle(warehouse_id, config, cron_id)
ShardManager.new.using_shard_for_warehouse_id(warehouse_id) do
inventory_view_service = BusinessServiceFactory.get_inventory_bulk_location_view_service(warehouse_id)
cob_dbd_wids_map = {}
iwit_dbd_wids_map = {}
if config.has_key? CUSTOMER_OUTBOUND
cutoffs = config[CUSTOMER_OUTBOUND][CUTOFFS_KEY]
ahead_time = config[CUSTOMER_OUTBOUND][TIME_CUTOFF_KEY]
dbd_to_bulk_res_count_array = inventory_view_service.get_dispatch_by_dates_for_bulk_reservations(CUSTOMER_OUTBOUND)
dbd_to_bulk_res_count_array = dbd_to_bulk_res_count_array.shift(cutoffs)
dbd_to_bulk_res_count_array.each do |dbd_to_bulk_res_count|
dispatch_by_date = dbd_to_bulk_res_count[:dispatch_by_date]
wid_reservations_map = inventory_view_service.get_reservation_id_and_wid_for_bulk_location_inventory(CUSTOMER_OUTBOUND,dispatch_by_date)
.group_by{ |res| res[:wid]}
wid_reservations_map = wid_reservations_map.map do |wid, reservations|
[wid, {
:reservation_type => CUSTOMER_OUTBOUND,
:dispatch_by_date => dispatch_by_date,
:wid => wid,
:reservation_ids => reservations.collect{ |res| res[:id]},
:reservation_count => reservations.count
}]
end.to_h
cob_dbd_wids_map[dispatch_by_date] = wid_reservations_map
end
end
if config.has_key? IWIT
cutoffs = config[CUSTOMER_OUTBOUND][CUTOFFS_KEY]
ahead_time = config[CUSTOMER_OUTBOUND][TIME_CUTOFF_KEY]
dbd_to_bulk_res_count_array = inventory_view_service.get_dispatch_by_dates_for_bulk_reservations(IWIT)
dbd_to_bulk_res_count_array = dbd_to_bulk_res_count_array.shift(cutoffs)
dbd_to_bulk_res_count_array.each do |dbd_to_bulk_res_count|
dispatch_by_date = dbd_to_bulk_res_count[:dispatch_by_date]
wid_reservations_map = inventory_view_service.get_reservation_id_and_wid_for_bulk_location_inventory(IWIT, dispatch_by_date)
.group_by{ |res| res[:wid]}
wid_reservations_map = wid_reservations_map.map do |wid, reservations|
[wid, {
:reservation_type => IWIT,
:dispatch_by_date => dispatch_by_date,
:wid => wid,
:reservation_ids => reservations.collect{ |res| res[:id]},
:reservation_count => reservations.count
}]
end.to_h
iwit_dbd_wids_map[dispatch_by_date] = wid_reservations_map
end
end
priority_array = get_priority(cob_dbd_wids_map.keys ,iwit_dbd_wids_map.keys)
wid_hash = {}
priority_array.each do |priority_hash|
wid_res_map = {}
case priority_hash[:reservation_type]
when IWIT
wid_res_map = iwit_dbd_wids_map[priority_hash[:dbd]]
when CUSTOMER_OUTBOUND
wid_res_map = cob_dbd_wids_map[priority_hash[:dbd]]
else
logger.error("Not a valid reservation type")
end
wid_res_map.each do |wid, res|
res[:priority] = priority_hash[:priority]
if wid_hash.has_key?(wid)
wid_hash[wid]["aggregated_reservations"] << res
else
wid_hash[wid] = {}
wid_hash[wid]["aggregated_reservations"] = [res]
end
end
end
wid_hash.each do |wid , value|
value["total_count"] = value["aggregated_reservations"].collect{ |res| res[:reservation_count]}.reduce(0, :+)
end
bucket_count = get_bucket_count(wid_hash, config)
buckets = if config.has_key?('use_hashing') and config['use_hashing']
get_buckets_by_hash(wid_hash, config['max_parallelism'], warehouse_id)
else
get_wids_filled_buckets(wid_hash, bucket_count)
end
buckets.each{ |bucket| publish(wid_hash, bucket, config["reservation_batch_size"], warehouse_id, cron_id)}
end
end
def self.get_wids_filled_buckets(wid_hash, bucket_count)
wid_hash = wid_hash.sort_by {|k, v| -v["total_count"]}
min_heap = BinaryHeap.new{|parent, child| child <=> parent}
(1..bucket_count).each{ |i| min_heap.insert(Bucket.new('bucket_'+i.to_s))}
wid_hash.each do |wid, value|
bucket = min_heap.eject
bucket.rank += value["total_count"]
bucket.items << wid
min_heap.insert bucket
end
puts min_heap.data.each{ |bucket| bucket.print}
min_heap.data
end
def self.get_buckets_by_hash(wid_hash, bucket_count, warehouse_id)
bucket_hash = Hash.new
total = 0
max = 0
wid_hash.each do |wid, value|
index = Digest::MD5.hexdigest(wid)[0, 16].to_i(16) % bucket_count
bucket = if bucket_hash.has_key? index
bucket_hash[index]
else
b = Bucket.new('bucket_'+ index.to_s)
bucket_hash[index] = b
b
end
bucket.items << wid
bucket.rank += value["total_count"]
max = bucket.rank > max ? bucket.rank : max
total += value["total_count"]
end
bucket_hash.values.each do |bucket|
ScMetrics.count('INV_SWAP_COUNT_'+ warehouse_id, (bucket.rank))
end
begin
ScMetrics.count('INV_SWAP_SKEW_'+ warehouse_id, (max)/(total/bucket_hash.keys.size))
rescue Exception => e
end
bucket_hash.values
end
def self.get_bucket_count(wid_hash,config)
wid_count = wid_hash.keys.count
total_reservations = wid_hash.collect{ |k,v| v["total_count"]}.reduce(0, :+)
batch_size = config["reservation_batch_size"]
total_batch = (total_reservations.to_f/batch_size).ceil
bucket_count = total_batch > config['max_parallelism'] ? config['max_parallelism'] : total_batch
bucket_count = bucket_count > wid_count ? wid_count : bucket_count
bucket_count
end
def self.get_priority(cob_dbds ,iwit_dbds)
priority_array = []
cob_dbds = cob_dbds.sort
iwit_dbds = iwit_dbds.sort
cob_dbd = cob_dbds.shift(1).first
iwit_dbd = iwit_dbds.shift(1).first
priority=1
while true
if cob_dbd!=nil && iwit_dbd!=nil
if cob_dbd > iwit_dbd
priority_array << {
:priority => priority,
:dbd => iwit_dbd,
:reservation_type => IWIT
}
priority +=1
iwit_dbd = iwit_dbds.shift(1).first
else
priority_array << {
:priority => priority,
:dbd => cob_dbd,
:reservation_type => CUSTOMER_OUTBOUND
}
priority +=1
cob_dbd = cob_dbds.shift(1).first
end
elsif cob_dbd!=nil && iwit_dbd==nil
priority_array << {
:priority => priority,
:dbd => cob_dbd,
:reservation_type => CUSTOMER_OUTBOUND
}
priority +=1
cob_dbd = cob_dbds.shift(1).first
elsif cob_dbd==nil && iwit_dbd!=nil
priority_array << {
:priority => priority,
:dbd => iwit_dbd,
:reservation_type => IWIT
}
priority +=1
iwit_dbd = iwit_dbds.shift(1).first
elsif cob_dbd==nil && iwit_dbd==nil
break
end
end
priority_array
end
def self.publish(wid_hash, bucket, batch_threshold, warehouse_id, cron_id)
aggregated_reservations_hash_array = bucket.items.collect{ |item| {item => wid_hash[item]}}
message_publisher = MessagePublisherFactory.message_publisher(SERVICE_NAME, 'wms_inventory_swap')
batch_reservation_count=0
payload = { "reservations" => {}, "cron_id" => cron_id}
group_id = warehouse_id + bucket.name
url = UrlBuilderLib::UrlBuilder.base_url(SupplyChain.warehouse_vip1).add_path('warehouses')
.add_path(warehouse_id).add_path('inventory').add_path('reshuffle').add_path('v2').build
while true
aggregated_reservations_hash = aggregated_reservations_hash_array.shift(1).first
break if aggregated_reservations_hash == nil
wid , aggregated_reservations = aggregated_reservations_hash.shift
payload["reservations"].merge!({ wid => []})
aggregated_reservations["aggregated_reservations"].sort_by { |i| i["priority"]}.each do |reservations|
is_reservation_available = false
while true
if batch_reservation_count <= batch_threshold
if batch_reservation_count + reservations[:reservation_count] > batch_threshold
batch_pending_count = batch_threshold - batch_reservation_count
remaining_reservation_count = reservations[:reservation_count] - batch_pending_count
batch_pending_reservations = reservations.merge(:reservation_ids => reservations[:reservation_ids].shift(batch_pending_count))
batch_pending_reservations.merge!({:reservation_count => batch_pending_count})
reservations.merge!({:reservation_count => remaining_reservation_count})
payload["reservations"][wid] << batch_pending_reservations
batch_reservation_count = batch_threshold
is_reservation_available= true
else
payload["reservations"][wid] << reservations
batch_reservation_count += reservations[:reservation_count]
is_reservation_available= false
end
end
if batch_reservation_count >= batch_threshold
message_publisher.post(url, payload.to_json, group_id)
batch_reservation_count=0
payload = { "reservations" => {}, "cron_id" => cron_id}
payload["reservations"].merge!({ wid => []})
if is_reservation_available
payload["reservations"].merge!({ wid => []})
else
break
end
end
unless is_reservation_available
break
end
end
end
end
unless payload["reservations"].empty?
message_publisher.post(url, payload.to_json, group_id)
end
end
end
def publish_metrics(metric_name, count,file)
for a in 1..10 do
t = Time.now.to_i
file.write("#{t} #{metric_name}.count #{count} \n")
end
end
class Bucket
include Comparable
attr_accessor :name, :items, :rank
def initialize(name)
@name, @rank = name, 0
@items = []
end
def <=>(other_bucket)
rank <=> other_bucket.rank
end
def print
"Bucket : #{name} , Items: #{items}, Rank: #{rank}"
end
end
def inventory_cron
report_start_time = Time.now
reshuffle_config = WarehouseService.settings.inventory_reshuffle_config_v2
reshuffle_config_v1 = WarehouseService.settings.inventory_reshuffle_config
cron_id = Time.now.to_i.to_s + '_inventory_swap'
success_count = 0
failure_count = 0
reshuffle_config.keys.each do |warehouse_id|
next if reshuffle_config_v1.has_key?(warehouse_id)
begin
logger.info "Inventory_reshuffle_v2 job started at #{Time.now}"
InventoryReshuffleV2.reshuffle(warehouse_id, reshuffle_config[warehouse_id], cron_id)
ScMetrics.increment('inventory_reshuffle_v2.success')
success_count += 1
rescue Exception => e
logger.info "INVENTORT_RESHUFFLE_V2_EXCEPTION_#{Time.now}"
logger.info e.message
logger.info e.backtrace
# File.open("/var/log/flipkart/supply-chain/fk-sc-warehouse-b2b/INVENTORT_RESHUFFLE_EXCEPTION_V2_#{cron_id}", 'a') { |file| file.write("#{warehouse_id} :=> #{e.message} \n #{e.backtrace} \n\n") }
# message_subject = "Inventory Reshuffle V2 Cron Failed!!! for #{warehouse_id}"
# message_body = "Cron Failed, \n exception is \n #{e.backtrace} "
# EmailPublisher.send_mail(message_subject,message_body)
ScMetrics.increment('inventory_reshuffle_v2.failure')
failure_count += 1
ensure
report_end_time = Time.now
report_execution_time = (report_end_time - report_start_time)* 1000
ScMetrics.timing('inventory_reshuffle.time', report_execution_time)
end
end
end
def run_inventory_reshuffle_cron
while true
inventory_cron
sleep 1800
end
endEditor is loading...
Leave a Comment