Issue
The task is to combine two arrays row by row (construct the permutations) based on the resulting multiplication of two corresponding vectors. Such as:
Row1_A, Row2_A, Row3_A,
Row1_B, Row2_B, Row3_B,
The result should be: Row1_A_Row1_B, Row1_A_Row2_B, Row1_A_Row3_B, Row2_A_Row1_B, etc..
Given the following initial arrays:
n_rows = 1000
A = np.random.randint(10, size=(n_rows, 5))
B = np.random.randint(10, size=(n_rows, 5))
P_A = np.random.rand(n_rows, 1)
P_B = np.random.rand(n_rows, 1)
Arrays P_A and P_B are corresponding vectors to the individual arrays, which contain a float. The combined rows should only appear in the final array if the multiplication surpasses a certain threshold, for example:
lim = 0.8
I have thought of the following functions or ways to solve this problem, but I would be interested in faster solutions. I am open to using numba or other libraries, but ideally I would like to improve the vectorized solution using numpy.
Method A
def concatenate_per_row(A, B):
m1,n1 = A.shape
m2,n2 = B.shape
out = np.zeros((m1,m2,n1+n2),dtype=A.dtype)
out[:,:,:n1] = A[:,None,:]
out[:,:,n1:] = B
return out.reshape(m1*m2,-1)
%%timeit
A_B = concatenate_per_row(A, B)
P_A_B = (P_A[:, None]*P_B[None, :])
P_A_B = P_A_B.flatten()
idx = P_A_B > lim
A_B = A_B[idx, :]
P_A_B = P_A_B[idx]
37.8 ms ± 660 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
Method B
%%timeit
A_B = []
P_A_B = []
for i in range(len(P_A)):
P_A_B_i = P_A[i]*P_B
idx = np.where(P_A_B_i > lim)[0]
if len(idx) > 0:
P_A_B.append(P_A_B_i[idx])
A_B_i = np.zeros((len(idx), A.shape[1] + B.shape[1]), dtype='int')
A_B_i[:, :A.shape[1]] = A[i]
A_B_i[:, A.shape[1]:] = B[idx, :]
A_B.append(A_B_i)
A_B = np.concatenate(A_B)
P_A_B = np.concatenate(P_A_B)
9.65 ms ± 291 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Solution
First of all, there is a more efficient algorithm. Indeed, you can pre-compute the size of the output array so the values can be directly written in the final output arrays rather than stored temporary in lists. To find the size efficiently, you can sort the array P_B
and then do a binary search so to find the number of value greater than lim/P_A[i,0]
for all possible i
(P_B*P_A[i,0] > lim
is equivalent to P_B > lim/P_A[i,0]
). The number of item filtered for each i
can be temporary stored so to quickly loop over the filtered items.
Moreover, you can use Numba to significantly speed the computation of the main loop up.
Here is the resulting code:
@nb.njit('(int_[:,::1], int_[:,::1], float64[:,::1], float64[:,::1])')
def compute(A, B, P_A, P_B):
assert P_A.shape[1] == 1
assert P_B.shape[1] == 1
P_B_sorted = np.sort(P_B.reshape(P_B.size))
counts = len(P_B) - np.searchsorted(P_B_sorted, lim/P_A[:,0], side='right')
n = np.sum(counts)
mA, mB = A.shape[1], B.shape[1]
m = mA + mB
A_B = np.empty((n, m), dtype=np.int_)
P_A_B = np.empty((n, 1), dtype=np.float64)
k = 0
for i in range(P_A.shape[0]):
if counts[i] > 0:
idx = np.where(P_B > lim/P_A[i, 0])[0]
assert counts[i] == len(idx)
start, end = k, k + counts[i]
A_B[start:end, :mA] = A[i, :]
A_B[start:end, mA:] = B[idx, :]
P_A_B[start:end, :] = P_B[idx, :] * P_A[i, 0]
k += counts[i]
return A_B, P_A_B
Here are performance results on my machine:
Original: 35.6 ms
Optimized original: 18.2 ms
Proposed (with order): 0.9 ms
Proposed (no ordering): 0.3 ms
The algorithm proposed above is 20 times faster than the original optimized algorithm. It can be made even faster. Indeed, if the order of items do not matter you can use an argsort
so to reorder both B
and P_B
. This enable you not to compute idx
every time in the hot loop and select directly the last elements from B
and P_B
(that are guaranteed to be higher than the threshold but not in the same order than the original code). Because the selected items are stored contiguously in memory, this implementation is much faster. In the end, this last implementation is about 60 times faster than the original optimized algorithm. Note that the proposed implementations are significantly faster than the original ones even without Numba.
Here is the implementation that do not care about the order of the items:
@nb.njit('(int_[:,::1], int_[:,::1], float64[:,::1], float64[:,::1])')
def compute(A, B, P_A, P_B):
assert P_A.shape[1] == 1
assert P_B.shape[1] == 1
nA, mA = A.shape
nB, mB = B.shape
m = mA + mB
order = np.argsort(P_B.reshape(nB))
P_B_sorted = P_B[order, :]
B_sorted = B[order, :]
counts = nB - np.searchsorted(P_B_sorted.reshape(nB), lim/P_A[:,0], side='right')
nRes = np.sum(counts)
A_B = np.empty((nRes, m), dtype=np.int_)
P_A_B = np.empty((nRes, 1), dtype=np.float64)
k = 0
for i in range(P_A.shape[0]):
if counts[i] > 0:
start, end = k, k + counts[i]
A_B[start:end, :mA] = A[i, :]
A_B[start:end, mA:] = B_sorted[nB-counts[i]:, :]
P_A_B[start:end, :] = P_B_sorted[nB-counts[i]:, :] * P_A[i, 0]
k += counts[i]
return A_B, P_A_B
Answered By - Jérôme Richard
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.