An application of the D-Wave Advantage Quantum Annealer to a classic unsupervised learning problem. We frame k-means clustering as a quadratic unconstrained binary optimization (QUBO) problem and evaluate the results against classic, iterative algorithms.
In this formulation of the problem, we allocate three qubits on the D-Wave system for every instance in the dataset, where each qubit represents membership in a cluster. If for example, the minimum energy anneal returned the states {1, 0, 0} (equivalently {β,β, β}) for a data point, this would indicate that the instance belongs exclusively to the first cluster, as we should expect. To ensure this exclusivity, we also set a constraint on the valid arrangements of qubits in which exactly one of the three membership qubits is up and the others are down (i.e. ({1, 0, 0}, {0, 1, 0}, {0, 0, 1}).
We then iterate over all pairs of instances in the dataset and calculate simple Euclidean distances for each pair. We use these distances as weights in the ππ,π quadratic interaction terms in the QUBO problem. These distances are fed through β cos (π β π) and β tanh(π) β 0.1 for same-cluster and different-cluster interaction terms respectively in order to reward similarity between instances with low energy values and penalize dissimilar pairs.
You can find the associated write-up for this project at this link.
Conor McCormack, Fall 2020
EE 520: Quantum Information Processing
University of Southern California, Viterbi School of Engineering.
import math
import numpy as np
import dwavebinarycsp
import dwave.inspector
from dwave.system import EmbeddingComposite, DWaveSampler
from collections import defaultdict
import matplotlib
matplotlib.use("agg")
import matplotlib.pyplot as plt
Code adopted from: https://github.com/dwave-examples/clustering
class Coordinate:
def __init__(self, x, y):
self.x = x
self.y = y
# coordinate labels for groups red, green, and blue
label = "{0},{1}_".format(x, y)
self.r = label + "r"
self.g = label + "g"
self.b = label + "b"
def get_distance(coordinate_0, coordinate_1):
diff_x = coordinate_0.x - coordinate_1.x
diff_y = coordinate_0.y - coordinate_1.y
return math.sqrt(diff_x**2 + diff_y**2)
def get_max_distance(coordinates):
max_distance = 0
for i, coord0 in enumerate(coordinates[:-1]):
for coord1 in coordinates[i+1:]:
distance = get_distance(coord0, coord1)
max_distance = max(max_distance, distance)
return max_distance
def get_groupings(sample):
"""Grab selected items and group them by color"""
colored_points = defaultdict(list)
for label, bool_val in sample.items():
# Skip over items that were not selected
if not bool_val:
continue
# Note: labels look like "<x_coord>,<y_coord>_<color>"
coord, color = label.split("_")
coord_tuple = tuple(map(float, coord.split(",")))
colored_points[color].append(coord_tuple)
return dict(colored_points)
def visualize_groupings(groupings_dict, filename):
"""
Args:
groupings_dict: key is a color, value is a list of x-y coordinate tuples.
For example, {'r': [(0,1), (2,3)], 'b': [(8,3)]}
filename: name of the file to save plot in
"""
for color, points in groupings_dict.items():
# Ignore items that do not contain any coordinates
if not points:
continue
# Populate plot
point_style = color + "o"
plt.plot(*zip(*points), point_style)
plt.savefig(filename)
def visualize_scatterplot(x_y_tuples_list, filename):
"""Plotting out a list of x-y tuples
Args:
x_y_tuples_list: A list of x-y coordinate values. e.g. [(1,4), (3, 2)]
"""
plt.plot(*zip(*x_y_tuples_list), "o")
plt.savefig(filename)
def cluster_points(scattered_points, filename):
# Set up problem
# Note: max_distance gets used in division later on. Hence, the max(.., 1)
# is used to prevent a division by zero
coordinates = [Coordinate(x, y) for x, y in scattered_points]
max_distance = max(get_max_distance(coordinates), 1)
# Build constraints
csp = dwavebinarycsp.ConstraintSatisfactionProblem(dwavebinarycsp.BINARY)
# Apply constraint: coordinate can only be in one color group
choose_one_group = {(0, 0, 1), (0, 1, 0), (1, 0, 0)}
for coord in coordinates:
csp.add_constraint(choose_one_group, (coord.r, coord.g, coord.b))
# Build initial BQM
bqm = dwavebinarycsp.stitch(csp)
# Edit BQM to bias for close together points to share the same color
for i, coord0 in enumerate(coordinates[:-1]):
for coord1 in coordinates[i+1:]:
d = get_distance(coord0, coord1) / max_distance
same_weight = -math.cos(d*math.pi)
bqm.add_interaction(coord0.r, coord1.r, same_weight)
bqm.add_interaction(coord0.g, coord1.g, same_weight)
bqm.add_interaction(coord0.b, coord1.b, same_weight)
diff_weight = -math.tanh(d) * 0.1
bqm.add_interaction(coord0.r, coord1.b, diff_weight)
bqm.add_interaction(coord0.r, coord1.g, diff_weight)
bqm.add_interaction(coord0.b, coord1.r, diff_weight)
bqm.add_interaction(coord0.b, coord1.g, diff_weight)
bqm.add_interaction(coord0.g, coord1.r, diff_weight)
bqm.add_interaction(coord0.g, coord1.b, diff_weight)
# Submit problem to D-Wave sampler
sampler = EmbeddingComposite(DWaveSampler())
# Note: we've received 1000 anneal results from the D-Wave system
sampleset = sampler.sample(bqm, chain_strength=4, num_reads=1000)
best_sample = sampleset.first.sample
# Open Inspector
dwave.inspector.show(bqm, sampleset)
# Visualize solution
groupings = get_groupings(best_sample)
visualize_groupings(groupings, filename)
# Print solution onto terminal
# Note: This is simply a more compact version of 'best_sample'
print(groupings)
We use numpy to randomly sample three points from three seperate multivariate gaussian distributions and run our clustering QUBO on the transformed data.
# Set up three different clusters of data points
covariance = [[3, 0], [0, 3]]
n_points = 3
x0, y0 = np.random.multivariate_normal([0, 0], covariance, n_points).T
x1, y1 = np.random.multivariate_normal([12, 14], covariance, n_points).T
x2, y2 = np.random.multivariate_normal([8, 3], covariance, n_points).T
xs = np.hstack([x0, x1, x2])
ys = np.hstack([y0, y1, y2])
xys = np.vstack([xs, ys]).T
scattered_points = list(map(tuple, xys))
visualize_scatterplot(scattered_points, "2d_points.png")
# Run clustering script with scattered_points
clustered_filename = "2d_points_clustered.png"
cluster_points(scattered_points, clustered_filename)
print("Your plots are saved to '{}' and '{}'.".format("2d_points.png",
clustered_filename))
from IPython.display import Image
Image(filename='2d_points.png')
Image(filename='2d_points_clustered.png')
import pandas as pd
iris_data = pd.read_csv('iris.data', names=['sepal length', 'sepal width', 'petal length', 'petal width', 'species'])
# Add species ID for convenience
unique = np.unique(iris_data['species'], return_inverse=True)[1]
iris_data['species id'] = unique
iris_data
We normalize each of the features to fit them to similar ranges. This is important since the most popular implementations of k-means all fit clusters to data based on distance metrics. Without normalization, any given feature may have an oversized influence on cluster formation.
from sklearn.preprocessing import StandardScaler
scaled = StandardScaler().fit_transform(iris_data.loc[:,:'petal width'])
scaled = pd.DataFrame(scaled, columns=['sepal length', 'sepal width', 'petal length', 'petal width'])
scaled['species id'] = iris_data['species id']
scaled['species'] = iris_data['species']
scaled['id'] = scaled.index
iris_data = scaled
iris_data
import plotly.express as px
fig = px.scatter_3d(iris_data, x='sepal length', y='sepal width', z='petal width', color='species')
fig.show()
fig = px.scatter_3d(iris_data, x='sepal length', y='sepal width', z='petal length', color='species')
fig.show()
fig = px.scatter_3d(iris_data, x='sepal length', y='petal length', z='petal width', color='species')
fig.show()
fig = px.scatter_3d(iris_data, x='sepal width', y='petal length', z='petal width', color='species')
fig.show()
fig = px.scatter_3d(iris_data, x='sepal width', y='sepal length', z='petal width', color="petal length", symbol="species",)
fig.show()
Note that cluster labels are randomly assigned here.
from sklearn.cluster import KMeans
preds = KMeans(n_clusters=3).fit_predict(iris_data.loc[:,:'petal width'])
preds
iris_data['sklearn_preds'] = preds
iris_data
fig = px.scatter_3d(iris_data, x='sepal width', y='petal length', z='petal width', color='sklearn_preds')
fig.show()
from sklearn.metrics import accuracy_score, cluster
accuracy_score(iris_data['species id'], preds)
iris_data = iris_data.drop(['sklearn_preds'],axis=1)
class Iris:
def __init__(self, sepal_length, sepal_width, petal_length, petal_width, species_id, species, idx):
self.sepal_length = sepal_length
self.sepal_width = sepal_width
self.petal_length = petal_length
self.petal_width = petal_width
self.species_id = species_id
self.species = species
self.id = idx
label = f'{sepal_length},{sepal_width},{petal_length},{petal_width}_'
self.setosa = label + "set"
self.versicolor = label + "vers"
self.virginica = label + "virg"
# Computer euclidean distance between flowers
def get_iris_distance(flower0, flower1):
diff_sl = flower0.sepal_length - flower1.sepal_length
diff_sw = flower0.sepal_width - flower1.sepal_width
diff_pl = flower0.petal_length - flower1.petal_length
diff_pw = flower0.petal_width - flower1.petal_width
return math.sqrt(diff_sl**2 + diff_sw**2 + diff_pl**2 + diff_pw **2)
def get_max_iris_distance(coordinates):
max_distance = 0
for i, coord0 in enumerate(coordinates[:-1]):
for coord1 in coordinates[i+1:]:
distance = get_iris_distance(coord0, coord1)
max_distance = max(max_distance, distance)
return max_distance
def get_iris_groupings(sample):
"""Grab selected items and group them by color"""
colored_points = defaultdict(list)
print(sample)
for label, bool_val in sample.items():
# Skip over items that were not selected
if not bool_val:
continue
# Parse selected items
# Note: label look like "<sep_len>,<sep_width>,<pet_len>,<pet_width>,<id>_<species>"
print(label)
if len(label.split("_")) < 2:
continue
coord, species = label.split("_")
coord_tuple = tuple(map(float, coord.split(",")))
colored_points[species].append(coord_tuple)
return dict(colored_points)
def cluster_iris(iris_data):
flower_list = list(map(tuple, list(iris_data.to_numpy())))
flowers = [Iris(sepal_length, sepal_width, petal_length, petal_width, species_id, species, idx) for sepal_length, sepal_width, petal_length, petal_width, species_id, species, idx in flower_list]
max_distance = max(get_max_iris_distance(flowers), 1)
# Build constraints
csp = dwavebinarycsp.ConstraintSatisfactionProblem(dwavebinarycsp.BINARY)
# Apply constraint: coordinate can only be in one color group
choose_one_group = {(0, 0, 1), (0, 1, 0), (1, 0, 0)}
for iris in flowers:
csp.add_constraint(choose_one_group, (iris.setosa, iris.versicolor, iris.virginica))
# Build initial BQM
bqm = dwavebinarycsp.stitch(csp, min_classical_gap=3)
# Edit BQM to bias for close together points to share the same color
for i, flower0 in enumerate(flowers[:-1]):
for flower1 in flowers[i+1:]:
# Set up weight
d = get_iris_distance(flower0, flower1) / max_distance # rescale distance
same_weight = -math.cos(d*math.pi)
# Apply weights to BQM
bqm.add_interaction(flower0.setosa, flower1.setosa, same_weight)
bqm.add_interaction(flower0.versicolor, flower1.versicolor, same_weight)
bqm.add_interaction(flower0.virginica, flower1.virginica, same_weight)
diff_weight = -math.tanh(d) * 0.5
# Apply weights to BQM
bqm.add_interaction(flower0.setosa, flower1.virginica, diff_weight)
bqm.add_interaction(flower0.setosa, flower1.versicolor, diff_weight)
bqm.add_interaction(flower0.virginica, flower1.setosa, diff_weight)
bqm.add_interaction(flower0.virginica, flower1.versicolor, diff_weight)
bqm.add_interaction(flower0.versicolor, flower1.setosa, diff_weight)
bqm.add_interaction(flower0.versicolor, flower1.virginica, diff_weight)
# Submit problem to D-Wave sampler
sampler = EmbeddingComposite(DWaveSampler())
sampleset = sampler.sample(bqm, chain_strength=8, num_reads=1000)
best_sample = sampleset.first.sample
# Visualize graph problem
dwave.inspector.show(bqm, sampleset)
groupings = get_iris_groupings(best_sample)
# Print solution onto terminal
# Note: This is simply a more compact version of 'best_sample'
return groupings
test = iris_data.loc[:3,].append(iris_data.loc[60:63,: ]).append(iris_data.loc[120:123:,:])
test
groups = cluster_iris(test)
results = pd.DataFrame()
for cluster in groups.items():
for flower in cluster[1]:
results = results.append({'sepal length': flower[0], 'sepal width': flower[1], 'petal length': flower[2], 'petal width': flower[3], 'cluster': cluster[0]}, ignore_index=True)
results
merged = results.merge(test, how="inner", on=["petal length", "petal width", "sepal length", "sepal width"])
merged = merged.replace('set',0).replace('vers',1).replace('virg',2)
merged
accuracy_score(merged['species id'], merged['cluster'])
fig = px.scatter_3d(results, x='sepal width', y='sepal length', z='petal width', color="cluster",)
fig.show()
fig = px.scatter_3d(results, x='sepal width', y='sepal length', z='petal length', color="cluster",)
fig.show()
fig = px.scatter_3d(results, x='sepal length', y='petal width', z='petal length', color="cluster",)
fig.show()
fig = px.scatter_3d(results, x='sepal width', y='petal width', z='petal length', color="cluster",)
fig.show()
Will selecting important features allow us to form larger clusters with greater stability?
class TinyIris:
def __init__(self, sepal_length, petal_width):
self.sepal_length = sepal_length
self.petal_width = petal_width
label = f'{sepal_length},{petal_width}_'
self.setosa = label + "set"
self.versicolor = label + "vers"
self.virginica = label + "virg"
# Computer euclidean distance between flowers
def get_tiny_iris_distance(flower0, flower1):
diff_sl = flower0.sepal_length - flower1.sepal_length
diff_pw = flower0.petal_width - flower1.petal_width
return math.sqrt(diff_sl**2 + diff_pw **2)
def get_max_iris_distance(coordinates):
max_distance = 0
for i, coord0 in enumerate(coordinates[:-1]):
for coord1 in coordinates[i+1:]:
distance = get_tiny_iris_distance(coord0, coord1)
max_distance = max(max_distance, distance)
return max_distance
def cluster_tiny_iris(iris_data):
flower_list = list(map(tuple, list(iris_data.to_numpy())))
flowers = [TinyIris(sepal_length, petal_width) for sepal_length, sepal_width, petal_length, petal_width, species_id, species, idx in flower_list]
max_distance = max(get_max_iris_distance(flowers), 1)
# Build constraints
csp = dwavebinarycsp.ConstraintSatisfactionProblem(dwavebinarycsp.BINARY)
# Apply constraint: coordinate can only be in one color group
choose_one_group = {(0, 0, 1), (0, 1, 0), (1, 0, 0)}
for iris in flowers:
csp.add_constraint(choose_one_group, (iris.setosa, iris.versicolor, iris.virginica))
# Build initial BQM
bqm = dwavebinarycsp.stitch(csp, min_classical_gap=3)
# Edit BQM to bias for close together points to share the same color
for i, flower0 in enumerate(flowers[:-1]):
for flower1 in flowers[i+1:]:
# Set up weight
d = get_tiny_iris_distance(flower0, flower1) / max_distance # rescale distance
same_weight = -math.cos(d*math.pi)
# Apply weights to BQM
bqm.add_interaction(flower0.setosa, flower1.setosa, same_weight)
bqm.add_interaction(flower0.versicolor, flower1.versicolor, same_weight)
bqm.add_interaction(flower0.virginica, flower1.virginica, same_weight)
diff_weight = -math.tanh(d) * 0.5
# Apply weights to BQM
bqm.add_interaction(flower0.setosa, flower1.virginica, diff_weight)
bqm.add_interaction(flower0.setosa, flower1.versicolor, diff_weight)
bqm.add_interaction(flower0.virginica, flower1.setosa, diff_weight)
bqm.add_interaction(flower0.virginica, flower1.versicolor, diff_weight)
bqm.add_interaction(flower0.versicolor, flower1.setosa, diff_weight)
bqm.add_interaction(flower0.versicolor, flower1.virginica, diff_weight)
# Submit problem to D-Wave sampler
sampler = EmbeddingComposite(DWaveSampler())
sampleset = sampler.sample(bqm, chain_strength=8, num_reads=1000)
best_sample = sampleset.first.sample
# Visualize graph problem
dwave.inspector.show(bqm, sampleset)
groupings = get_iris_groupings(best_sample)
# Print solution onto terminal
# Note: This is simply a more compact version of 'best_sample'
return groupings
tiny_test = iris_data.loc[:5,].append(iris_data.loc[60:65,: ]).append(iris_data.loc[120:125:,:])
tiny_test
tiny_group = cluster_tiny_iris(tiny_test)
tiny_results = pd.DataFrame()
for cluster in tiny_group.items():
for flower in cluster[1]:
tiny_results = tiny_results.append({'sepal length': flower[0], 'petal width': flower[1], 'cluster': cluster[0]}, ignore_index=True)
tiny_results.drop_duplicates()
tiny_merged = tiny_results.merge(tiny_test, how="left", on=["petal width", "sepal length"])
tiny_merged = tiny_merged.replace('set',0).replace('vers',1).replace('virg',2)
tiny_merged
accuracy_score(tiny_merged['species id'], tiny_merged['cluster'])