Issue
Trying to explore the Random Walk algorithm on Infinite Line and I'm looking for a way to make it as optimal as possible. Here is the code:
import random
from collections import Counter
def run(initial_pos,iterations,trials):
final_pos = []
for i in range (0,trials):
pos = initial_pos
for j in range (0,iterations):
if random.choice(["left","right"]) == "left":
pos -= 1
else:
pos += 1
final_pos.append(pos)
return Counter(final_pos)
Variable iterations indicates the number of repetitions in a single walk.
While trials indicates the number of walks.
The runtime is satisfactory for trials and iterations equals 10^4
Hwever, increasing to 10^5 requires long waiting time.
Solution
The Low-hanging Fruit
First of all, let's get rid of those magic number and strings, and define some named constants:
RIGHT_NAME, LEFT_NAME = "right", "left"
RIGHT, LEFT = 1, -1
Now, we can rewrite the inner loop body to use those constants:
if random.choice([LEFT_NAME , RIGHT_NAME]) == LEFT_NAME:
pos += LEFT
else:
pos += RIGHT
Looking at that, two issues become apparent:
- We're comparing strings for equality. That is unlikely to perform better than comparing small integers.
- We have two pairs of constants representing the same concept (choice between left and right). We're repeating ourselves, and that's rarely a good thing.
Let's eliminate both of those issues by just using the integer constants RIGHT
and LEFT
. Let's also store the result of random.choice
into a temporary to better see the next issue:
current_direction = random.choice([LEFT, RIGHT])
if current_direction == LEFT:
pos += LEFT
else:
pos += RIGHT
Now, we can see that current_direction
can be either RIGHT
or LEFT
. If it's LEFT
, then we add LEFT
to pos
, otherwise (the only other choice is RIGHT
), we add RIGHT
to pos
. In other words:
current_direction = random.choice([LEFT, RIGHT])
if current_direction == LEFT:
pos += current_direction # current_direction is LEFT
else:
pos += current_direction # current_direction is RIGHT
Same thing happens in both branches, so let's get rid of the condition:
pos += random.choice([LEFT, RIGHT])
Here is our new starting point:
def run(initial_pos, iterations, trials):
final_pos = []
RIGHT, LEFT = 1, -1
for i in range(0, trials):
pos = initial_pos
for j in range(0, iterations):
pos += random.choice([LEFT, RIGHT])
final_pos.append(pos)
return Counter(final_pos)
Now, if we look at the code critically (and having the low-level details of the byte-code and interpreter in mind -- the dis
module helps here) we can see some other deficiencies:
- The call
range(0, trials)
is functionally identical torange(trials)
. However, that redundant0
means an extra opcode to push the constant onto the stack. We don't want to waste time on useless things, right? - In our inner-most loop, we're calling a function with argument
[LEFT, RIGHT]
. Python doesn't optimize such things, so that means we're creating the same listiterations * trials
times. Even though it's just 3 opcodes, let's do it once, and then reuse the same list. We might as well make it a tuple, it doesn't need to change. - The class
collections.Counter
supports updates. So let's avoid the intermediatefinal_pos
list, and update theCounter
directly. - If we're hunting cycles, we can also note that a call to
random.choice
involves two opcodes (first getrandom
, then findchoice
). We can cache the actual functions to call in local variables to avoid the extra step.
def run_v1(initial_pos, iterations, trials):
RIGHT, LEFT = 1, -1
CHOICES = (RIGHT, LEFT)
result = Counter()
random_choice = random.choice
result_update = result.update
for i in range(trials):
pos = initial_pos
for j in range(iterations):
pos += random_choice(CHOICES)
result_update([pos])
return result
Those initial changes only mean the code runs in about 90%-95% of the time needed by the original, but they give us a solid basis for further optimizations.
Eliminating Inner Loop — Attempt 1
Right now our inner loop basically makes a sum of a list of random choices, offset by initial_pos
. Let's use random.choices
instead to generate iterations
choices in one call, and the builtin sum
to add them up.
def run_v2(initial_pos, iterations, trials):
RIGHT, LEFT = 1, -1
CHOICES = (RIGHT, LEFT)
result = Counter()
random_choices = random.choices
result_update = result.update
for i in range(trials):
result_update([initial_pos + sum(random_choices(CHOICES, k=iterations))])
return result
This requires roughly 25% of the time required by run_v1
.
Eliminating Inner Loop — Attempt 2
The main problem with the previous version is that it ends up allocating a lot of intermediate Python objects (one for each choice). A more efficient way of using the memory would be to use numpy arrays and various functions the library provides. For example, we could use numpy.Generator.choice
and the sum
method of numpy arrays:
def run_v3(initial_pos, iterations, trials):
RIGHT, LEFT = 1, -1
CHOICES = (RIGHT, LEFT)
result = Counter()
rng = np.random.default_rng()
rng_choice = rng.choice
result_update = result.update
for i in range(trials):
result_update([initial_pos + rng_choice(CHOICES, size=iterations).sum()])
return result
This version requires around 20% of the time required by run_v2
.
More Efficient Choices
Currently, the random choices require an indirect lookup to map into our desired set of (1, -1)
. Let's observe that iterations = right_count + left_count
. We already know the value of iterations
, so as long as we know one of right_count
or left_count
, we can calculate the other.
Therefore pos_offset = 2 * right_count - iterations
, and we can implement it as such:
def run_v4(initial_pos, iterations, trials):
result = Counter()
rng = np.random.default_rng()
rng_choice = rng.choice
result_update = result.update
for i in range(0, trials):
result_update([initial_pos + 2 * rng_choice(2, size=iterations).sum() - iterations])
return result
This time it requires about 60%-90% of run_v3
.
Further Variations
Using np.random.default_rng().integers
instead.
def run_v5a(initial_pos, iterations, trials):
final_pos = []
rng = np.random.default_rng()
rng_integers = rng.integers
for i in range(0, trials):
pos = initial_pos + 2 * rng_integers(2, size=iterations).sum() - iterations
final_pos.append(pos)
return Counter(final_pos)
def run_v5b(initial_pos, iterations, trials):
final_pos = []
rng = np.random.default_rng()
rng_integers = rng.integers
for i in range(0, trials):
pos = initial_pos + 2 * rng_integers(2, dtype=np.uint8, size=iterations).sum(dtype=np.int32) - iterations
final_pos.append(pos)
return Counter(final_pos)
Using more memory to eliminate the upper loop.
def run_v6a(initial_pos, iterations, trials):
rng = np.random.default_rng()
final_pos = initial_pos + 2 * rng.integers(2, size=(trials, iterations)).sum(axis=1) - iterations
return Counter(final_pos)
def run_v6b(initial_pos, iterations, trials):
rng = np.random.default_rng()
final_pos = initial_pos + 2 * rng.integers(2, dtype=np.uint8, size=(trials, iterations)).sum(axis=1,
dtype=np.int32) - iterations
return Counter(final_pos)
As suggested by Jérôme Richard sampling binomial distribution.
Applying math and using np.random.binomial
.
def run_v7(initial_pos, iterations, trials):
final_pos = initial_pos + 2 * np.random.binomial(iterations, 0.5, trials) - iterations
return Counter(final_pos)
This runs in 1/4 second for 10^6 iterations and trials.
Answered By - Dan Mašek
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.