Part 1 code

 avatar
unknown
python
2 months ago
5.3 kB
1
Indexable
import numpy as np
import scipy.stats as sstats
import copy as cp

p = 1/10
q = 2/5
r = 2


def sample_markov_chain(P, start_state, steps):
    """
    Sample a trajectory from a Markov chain given a transition matrix.

    Parameters:
        P (ndarray): Transition probability matrix.
        start_state (int): Initial state of the Markov chain.
        steps (int): Number of steps to sample.

    Returns:
        list: A list of states representing the sampled trajectory.
    """
    states = [start_state]
    current_state = start_state
    for _ in range(steps):
        current_state = np.random.choice(np.arange(P.shape[0]), p=P[current_state])
        states.append(current_state)
    return states

epsilon = sstats.nbinom(r,q) #probability of epsilon new fatties
D = sstats.bernoulli(p) #Probability of disappearance

N = 11
P = np.zeros((N,N))


def getNStepMatrix(P,n):
    P_n = P
    for i in range(n-1):
        P_n = P@P_n
    return P_n


#GENERATED BY CHATGPT
def getStationaryMatrix(P):
    # Solve for the stationary distribution
    A = P.T - np.eye(N)  # P.T ensures the equation pi @ P = pi
    A[-1, :] = 1         # Replace the last equation with normalization: sum(pi) = 1
    b = np.zeros(N)
    b[-1] = 1            # Right-hand side for normalization

    # Solve the system
    pi = np.linalg.solve(A, b)

    assert(sum(pi) == 1)
    return pi


def get_transient_matrix(P):
    """
    Extracts the 9x9 transient matrix S from the transition probability matrix P.
    Absorbing states (0 and 10) are excluded.

    Parameters:
        P (ndarray): The original transition probability matrix (11x11).

    Returns:
        ndarray: The 9x9 transient matrix S.
    """
    transient_states = list(range(1, 10))  # States 1 to 9 are transient
    S = P[np.ix_(transient_states, transient_states)]  # Extract rows and columns for transient states
    return S

def getSurvivalT(P,t,M = 100000):
    empirical_data = [sample_markov_chain(P,5,t+1) for _ in range(M)]
    prop = 0

    for data in empirical_data:
        if (0 in data) or (10 in data):
            prop += 1

    print(f"Through empirical tests we get that the probability of T>{t} is approx: {(M - prop) / M}")

    #Theoretical value
    S = get_transient_matrix(P)
    alpha = [0,0,0,0,1,0,0,0,0]

    St = S
    for _ in range(t):
        St = St@S

    print(f"The real probability being: {np.sum(alpha@St)}")


def get_hitting_probabilities(P):
    """
    Computes the hitting probabilities for absorbing states in the Markov chain.

    Parameters:
        P (ndarray): Transition probability matrix with absorbing states.

    Returns:
        ndarray: A matrix containing hitting probabilities for each absorbing state.
    """
    # Define transient states and absorbing states
    transient_states = list(range(1, 10))  # Transient states: 1 to 9
    absorbing_states = [0, 10]  # Absorbing states: 0 and 10

    # Extract Q (transient-transient) and R (transient-absorbing) matrices
    Q = P[np.ix_(transient_states, transient_states)]
    R = P[np.ix_(transient_states, absorbing_states)]

    # Compute the fundamental matrix N = (I - Q)^(-1)
    I = np.eye(Q.shape[0])  # Identity matrix of size Q
    N = np.linalg.inv(I - Q)

    # Compute hitting probabilities B = N * R
    B = N @ R

    return B


if __name__ == "__main__":

    #What we had starting the last month
    for j in range(0,N):

        #What we will have starting this month
        for i in range(0,N):

            #No new patients or we get resat while still at 0 patients
            if(i == 0 and j == 0):
                P[j][i] = D.pmf(1) + epsilon.pmf(0)*D.pmf(0)

            #We get resat
            elif(i == 0 and j != 0):
                P[j][i] = D.pmf(1)

            #We get a value below our current value but no reset
            elif(i<j and i != 0):
                P[j][i] = 0

            elif(i>=j and i<(N-1)):
                P[j][i] = epsilon.pmf(i-j)*D.pmf(0)
            elif(i == (N-1)):
                #The CDF is inclusive, so in order to get F(0)=0, we need to take F(-1).
                P[j][i] = (1 - epsilon.cdf(10-j-1))*D.pmf(0)

    print(np.array2string(P, precision=2, separator=', ', floatmode='fixed'))
    #Now we get the 12-step probability
    P12 = getNStepMatrix(P,12)

    #And we calculate the mean value
    P120 = P12[0]
    E_12 = 0

    for k,pij in enumerate(P120):
        E_12 += k*pij

    print(f"The expected value after 12 steps is: {E_12}")

    #Now we calculate the stationary distribution - ChatGPT solved this
    pi = getStationaryMatrix(P)
    E_infty = 0
    V_infty = 0

    for k,pij in enumerate(pi):
        E_infty += k*pij

    for k,pij in enumerate(pi):
        V_infty += (E_infty-k)**2*pij

    print(f"Expected stationary: {E_infty} and Var: {V_infty}")

    #Question 4 - M = 1, no empiricales
    getSurvivalT(P,23,M=1)

    #Question 5 - first step analysis
    P_absorbing = cp.deepcopy(P)
    for i in [0,10]:
        for j in range(11):
            if(i != j):
                P_absorbing[i][j] = 0
            else:
                P_absorbing[i][j] = 1
    print(P)
   #print(get_hitting_probabilities(P_absorbing)[4])
    print(getStationaryMatrix(P))
    Epsilon = sstats.nbinom(r*24,q)
    print(1-Epsilon.cdf(4))
Leave a Comment