Finding the best solution for a problem can require many attempts, thus finding other than the expected result, also the process which can give us the best performance.
In this post, I will describe the solution for a classic problem, to find the closest pair of points in a plan.
Suppose that a region has 50 users of a logistic enterprise. The company needs to discover which users are the closest to one another.
A common way to think about it is to calculate all of the possible combinations for the users, two by two, and choose those that has a minimal distance between both of them. We call this method as brute force.
But, this solution could cost a processing as heavy as n². In other words, the cost for this solution grows exponentially following the n increasing.
If we think of 1 million users, we’re talking about an order of 1 TRILION processing calculations. Indeed, one milion is not great enough today, in terms of big data.
Another way, more intelligent, is to use the algorithm called DIVIDE AND CONQUER.
This algorithm has the finality of dividing the problem into the smallest pieces (DIVIDE) and join all of the found solutions in each piece (CONQUER).
The cost for this processing is very small in comparison to brute force. We can obtain an order cost of n log(n).
Data Format
Before we begin, let’s suppose that the data comes in JSON format:
{
'place_a': ['coord_x', 'coord_y'],
'place_b': ['coord_x', 'coord_y'],
'place_c': ['coord_x', 'coord_y'],
'place_d': ['coord_x', 'coord_y'],
...
}
As a example, let’s use those 20 random location points:
_dict = {
"onfsc": [78.4, 78.1],
"gtbgn": [5.5, 8.4],
"aseoh": [51.0, 52.1],
"zzhwe": [89.0, 15.9],
"trvki": [19.9, 35.3],
"fpnwj": [28.9, 5.3],
"zznss": [59.9, 14.5],
"vzema": [70.7, 69.8],
"rghob": [27.3, 31.1],
"xhyub": [63.9, 64.1],
"budfa": [79.7, 59.6],
"xbkbj": [32.1, 46.6],
"uoopk": [46.5, 75.1],
"aaocg": [29.4, 67.6],
"wzkfl": [90.7, 24.8],
"vrnef": [57.8, 19.2],
"cwyix": [11.1, 14.4],
"tbpit": [28.6, 45.8],
"pwnhf": [94.5, 82.3],
"banvu": [25.7, 25.6]
}
Divide and Conquer
We can begin splitting the solution in two steps. First, we will sort the points based in X axis. Conventionally, we sort in X axis, but if we use Y axis instead, the result is the same.
FIRST STEP
We can create a called Places class:
from random import uniform, choice
import string
class Places:
def __init__(self, places=None, n=None, amplitude=100):
self.places = places
if not places:
assert n, "if you don't give the places, give me a number of places to generate random n places"
letters = string.ascii_lowercase
self.places = {''.join(choice(letters) for i in range(5)): [uniform(0,amplitude), uniform(0,amplitude)] for j in range(n)}
else:
assert type(places) == dict, "places must be a dict, if you refering to n, please name it"
When we instantiate this class, we have 2 two options, giving the dict of points, or giving a n value for the class could generate n random points.
For the points sorting, let’s convert the dict to a list, which will store as the coordinates, as the name of each point. Adding the dict_to_list method to the class:
def dict_to_list(self, _dict):
return list(zip(_dict.values(), _dict.keys()))
The new format look like this:
# _list
[
([78.4, 78.1], "onfsc" ),
([5.5, 8.4], "gtbgn" ),
([51.0, 52.1], "aseoh" ),
([89.0, 15.9], "zzhwe" ),
([19.9, 35.3], "trvki" ),
([28.9, 5.3], "fpnwj" ),
([59.9, 14.5], "zznss" ),
([70.7, 69.8], "vzema" ),
([27.3, 31.1], "rghob" ),
([63.9, 64.1], "xhyub" ),
([79.7, 59.6], "budfa" ),
([32.1, 46.6], "xbkbj" ),
([46.5, 75.1], "uoopk" ),
([29.4, 67.6], "aaocg" ),
([90.7, 24.8], "wzkfl" ),
([57.8, 19.2], "vrnef" ),
([11.1, 14.4], "cwyix" ),
([28.6, 45.8], "tbpit" ),
([94.5, 82.3], "pwnhf" ),
([25.7, 25.6], "banvu" )
]
For sorting, we use an algorithm called merge and sort. This method is based on the divide and conquer algorithm.
Merge and sort consists of spliting the points list in smaller lists, until we can have one element by list. In a function, we can use recursivity to obtain this.
After the division, we go through the conquer part. This part, we compare each list to the other, always looking to the first element of each list and dropping the smaller. The third sorted list is complete when we consume both lists.
Think of a scenario where you have two sorted cards stacks, you need merge both to make a third stack. You can catch every card on top, only the smaller from the top, and construct the third stack from the others two stacks.
See the example below:
In python, we can make in this way:
def merge_list(self, list_a, list_b):
ordered = []
while list_a or list_b:
if len(list_a) and len(list_b):
if list_a[0][0] < list_b[0][0]:
ordered.append(list_a.pop(0))
else:
ordered.append(list_b.pop(0))
elif len(list_a):
ordered.extend(list_a)
list_a = []
elif len(list_b):
ordered.extend(list_b)
list_b = []
return ordered
def merge_sort(self, _list):
ordered = []
if len(_list) > 1:
mid = len(_list) // 2
left = self.merge_sort(_list[0:mid])
right = self.merge_sort(_list[mid:])
ordered = self.merge_list(left, right)
return ordered
else:
return _list
The merge_sort divides the list recursively until all lists have just one element. After that, the lists are merged (from this the name merge) two at a time and creating a third list, already sorted.
SECOND STEP
After we sorted the points by an axis, we can split again, using the median, sucessivaly until we get 2 or 3 points in each slice of the plan.
In the slice with 2 or 3 points, we can use brute force to get the smallest distance among the points.
We use euclidean distance between two points. This is a simple solution that is not precise for geographical coordinates, like longitude and latitude, but in our example, it’s enough to.
from numpy import sqrt
def distance(self, point_1, point_2):
return sqrt(sum(((point_1[0] - point_2[0])**2, (point_1[1] - point_2[1])**2)))
To know the closest points using brute force when we have up to 3 points, we can def a brute_force method:
def brute_force_distance(self, _list):
"""
format list [((x1, y1), name1), ((x2, y2), name2), ...]
"""
assert len(_list) <= 3, "Please, don't ask me to do what I can't do"
results = []
for i, location in enumerate(_list):
for j, neighbor in enumerate(_list[i+1:]):
results.append((self.distance(location[0], neighbor[0]),
location[1], neighbor[1]))
return min(results)
When we get the closest distance (δ) in each slice, we use this distance to compare the points near the division line of the slice.
All of the points inside the band from the left side will be calculated by the distance. We need to consider only the six next points for each point. But, why only 6?
Below I will try to explain:
Suppose P is a left side point, the maximum number of points on the right side that could be closest than δ is 6. Otherwise, the distance between those points would be smaller than δ, which is not true, because δ is already the smallest value on both sides.
So, for each point on the left side, we calculate for the next 6 points.
When we get the smallest value for each slice and each band, we can evolute recursively until get the closest pair in all of the plan.
In python, we can also solve this recursively:
#list format [[(x,y), name]]
def closest_pair(self, _list=None): # must be a sorted list
if not _list:
_list = self.dict_to_list(self.places)
_list = self.merge_sort((_list))
if len(_list) > 3:
middle = len(_list) // 2
left = self.closest_pair(_list[:middle])
right = self.closest_pair(_list[middle:])
closest = left if min(left[0], right[0]) == left[0] else right
return self.min_from_middle(middle, _list, closest)
else:
return self.brute_force_distance(_list)
def min_from_middle(self, middle, _list, closest):
middle_x = (_list[middle][0][0] + _list[middle - 1][0][0]) / 2
upper = middle_x + closest[0]
lower = middle_x - closest[0]
points = [item for item in _list if (lower <= item[0][0] <= upper)]
for i, point in enumerate(points):
for j, neighbor in enumerate(points[min(i + 1, len(points)): min(len(points),i+6)]):
dist = self.distance(point[0], neighbor[0])
if dist < closest[0]:
closest = dist, point[1], neighbor[1]
return closest
The closest_pair method returns the smallest distance and the two name points:
>> closest_pair(_list)
(3.5902646142032495, 'tbpit', 'xbkbj')
Plotting
We can improve our work, adding a method that allows the points plotting in a graph.
For this, we use the awesome lib Plotly.
import plotly.graph_objs as go
def plot(self, show_closest=False):
x, y = zip(*self.places.values())
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=x,
y=y,
mode='markers'
)
)
if show_closest:
closest = self.closest_pair()[1:]
x0 = self.places[closest[0]][0]
y0 = self.places[closest[0]][1]
x1 = self.places[closest[1]][0]
y1 = self.places[closest[1]][1]
fig.add_trace(
go.Scatter(
x=[x0, x1],
y=[y0, y1],
mode='markers',
marker=dict(
size=16,
color="red",
)
)
)
fig.show()
If we set show_closest=True in method, the closest points are featured.
Ready, we have concluded our class. Let's taste it.
places = Places(places=_dict)
places.plot()
Now, with the closest pair featured:
places = Places(places=_dict)
places.plot(True)
Our script can receive a n variable by command line adding this:
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("n", help="number of places for generate")
args = parser.parse_args()
n = int(args.n)
places = Places(n=n)
We can also implement the amplitude variable and/or set a path for the JSON file through the command line. But this could be your homework!
Final Version
Our final script became this way:
from random import uniform, choice
import string
from numpy import sqrt
import plotly.graph_objs as go
class Places:
def __init__(self, places=None, n=None, amplitude=100):
self.places = places
if not places:
assert n, "if you don't give the places, give me a number of places to generate random n places"
letters = string.ascii_lowercase
self.places = {''.join(choice(letters) for i in range(5)): [uniform(0,amplitude), uniform(0,amplitude)] for j in range(n)}
else:
assert type(places) == dict, "places must be a dict, if you refering to n, please name it"
def merge_list(self, list_a, list_b):
ordered = []
while list_a or list_b:
if len(list_a) and len(list_b):
if list_a[0][0] < list_b[0][0]:
ordered.append(list_a.pop(0))
else:
ordered.append(list_b.pop(0))
elif len(list_a):
ordered.extend(list_a)
list_a = []
elif len(list_b):
ordered.extend(list_b)
list_b = []
return ordered
def merge_sort(self, _list):
ordered = []
if len(_list) > 1:
mid = len(_list) // 2
left = self.merge_sort(_list[0:mid])
right = self.merge_sort(_list[mid:])
ordered = self.merge_list(left, right)
return ordered
else:
return _list
def distance(self, point_1, point_2):
return sqrt(sum(((point_1[0] - point_2[0])**2, (point_1[1] - point_2[1])**2)))
def dict_to_list(self, _dict):
return list(zip(_dict.values(), _dict.keys()))
def brute_force_distance(self, _list):
"""
format list [((x1, y1), name1), ((x2, y2), name2), ...]
"""
assert len(_list) <= 3, "Please, don't ask me to do what I can't do"
results = []
for i, location in enumerate(_list):
for j, neighbor in enumerate(_list[i+1:]):
results.append((self.distance(location[0], neighbor[0]),
location[1], neighbor[1]))
return min(results)
#list format [[(x,y), name]]
def closest_pair(self, _list=None): # must be a sorted list
if not _list:
_list = self.dict_to_list(self.places)
_list = self.merge_sort((_list))
if len(_list) > 3:
middle = len(_list) // 2
left = self.closest_pair(_list[:middle])
right = self.closest_pair(_list[middle:])
closest = left if min(left[0], right[0]) == left[0] else right
return self.min_from_middle(middle, _list, closest)
else:
return self.brute_force_distance(_list)
def min_from_middle(self, middle, _list, closest):
middle_x = (_list[middle][0][0] + _list[middle - 1][0][0]) / 2
upper = middle_x + closest[0]
lower = middle_x - closest[0]
points = [item for item in _list if (lower <= item[0][0] <= upper)]
for i, point in enumerate(points):
for j, neighbor in enumerate(points[min(i + 1, len(points)): min(len(points),i+6)]):
dist = self.distance(point[0], neighbor[0])
if dist < closest[0]:
closest = dist, point[1], neighbor[1]
return closest
def plot(self, show_closest=False):
x, y = zip(*self.places.values())
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=x,
y=y,
mode='markers'
)
)
if show_closest:
closest = self.closest_pair()[1:]
x0 = self.places[closest[0]][0]
y0 = self.places[closest[0]][1]
x1 = self.places[closest[1]][0]
y1 = self.places[closest[1]][1]
fig.add_trace(
go.Scatter(
x=[x0, x1],
y=[y0, y1],
mode='markers',
marker=dict(
size=16,
color="red",
)
)
)
fig.show()
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("n", help="number of places for generate")
args = parser.parse_args()
n = int(args.n)
places = Places(n=n)
All of the code is available in my Github profile.