网站建设公司宣传,专业做网站优化,wordpress只能本地访问,作品提示优化要删吗【2023年中国高校大数据挑战赛 】赛题 B DNA 存储中的序列聚类与比对 Python实现
更新时间#xff1a;2023-12-29
1 题目
赛题 B DNA 存储中的序列聚类与比对
近年来#xff0c;随着新互联网设备的大量涌入和对其服务需求的指数级增长#xff0c;越来越多的数据信息被产…【2023年中国高校大数据挑战赛 】赛题 B DNA 存储中的序列聚类与比对 Python实现
更新时间2023-12-29
1 题目
赛题 B DNA 存储中的序列聚类与比对
近年来随着新互联网设备的大量涌入和对其服务需求的指数级增长越来越多的数据信息被产生与收集。预计到 2021 年数据中心内部的IP流量将达到14.7 ZB数据中心之间的流量将达到 2.8 ZB。如何储存与运输如此庞大的数据已经成为了难题。DNA存储技术是一项着眼于未来的具有划时代意义存储技术正成为应对数据爆炸的关键技术之一。DNA存储技术指的是使用人工合成的脱氧核糖核苷酸DNA作为介质进行信息存储的技术其具有理论存储量大、维护方便的优点。具体来说DNA存储将计算机的二进制信息转换为四种碱基腺嘌呤A、胸腺嘧啶T、鸟嘌呤G和胞嘧啶C组成的DNA序列相当于转换为四进制之后合成为DNA分子干粉。需要读取信息时将DNA分子进行PCR扩增这步将会使得原有DNA序列进行扩增复制之后使用测序仪测出DNA信息。然而在合成、测序等阶段会存在一定的错误有概率随机发生碱基删除、增添或者替换。下图是某个序列合成测序后的示意图可以看出由于发生了碱基删除、增添和替换进而将ATGCATGC变成了AGCAATTC 因此对于我们设计好的DNA序列实际生产测序出来后的序列会存在以下差异 测序后的序列将比原始序列的数量多很多因为原始序列会被随机扩增成很多条。 测序后的序列相比于原始序列有可能存在错误包括某个碱基缺失、替换、或添加了某个未知碱基甚至会出现断链。
针对以上两个特点目前往往需要对测序后的序列进行聚类与比对。其中聚类指的是将测序序列聚类以判断原始序列有多少条聚类后相同类的序列定义为一个簇。比对则是指在聚类基础上对一个簇内的序列进行比对进而输出一条最有可能的正确序列。通过聚类与比对将会极大地恢复原始序列的信息但需要注意由于DNA测序后序列众多如何高效地进行聚类与比对则是在满足准确率基础上的另一大难点。
“train_reference.txt”是某次合成的目标序列其中第一行为序号第二行为序列内容。通过真实合成、测序后读取到的测序序列文件为“train_reads.txt”我们已经对测序序列进行了分类该文件第一行为目标序列的序号第二行为序列内容。
基于赛题提供的数据自主查阅资料选择合适的方法完成如下任务
**任务 1**观察数据集“train_reads.txt”、“train_reference.txt”针对这次合成任务进行错误率插入、删除、替换、断链、拷贝数方面的分析。其中错误率定义为某个碱基发生错误的概率需要对不同类型的错误率分别进行分析。拷贝数定义为原始序列复制的数量。
**任务 2**设计开发一种模型用于对测序后的序列“train_reads.txt”进行聚类并根据“train_reads.txt”的标签验证模型准确性。模型主要从两方面评估效果
1聚类后准确性包括簇的数量以及簇内纯度、2聚类速度以分钟为单位。
任务 3 “test_reads.txt”是我们在另一种合成环境下合成的测序文件与 “train_reads.txt”的目标序列不相同请用任务 2 所开发的模型对其进行聚类给出聚类耗时以及“test_reads.txt”的目标序列数量给出拷贝数分布图。
任务 4 聚类后能否通过比对恢复原始信息也是极为关键的设计开发一种用于同簇序列的比对模型该模型可以针对同簇的DNA序列进行比对并输出最有可能正确的目标序列。 请使用该工具对任务 3 中“test_reads.txt”的聚类后序列进行比对并输出“test_reads.txt”最有可能的目标序列并分析“test_reads.txt”的错误率。请用一个“test_ref.txt”的文件记录“test_reads.txt”的目标序列文件内序列的形式为
AAAA…… AAAT…… AATA…… …… CCCC……
即序列只用回车间隔不需要加其他符号序列顺序按照从前到后ATGC依次的顺序。此外需要在论文中展示前十条目标序列的聚类结果。
附件 1train_reference.txt train数据集的正确序列 附件 2train_reads.txt train数据集的合成测序后序列 附件 3test_reads.txt test数据集的合成测序后序列
参考文献 Dong Y, Sun F, Ping Z, et al. DNA storage: research landscape and future prospects[J]. National Science Review, 2020, 7(6): 1092-1107. Fu L, Niu B, Zhu Z, et al. CD-HIT: accelerated for clustering the next-generation sequencing data[J]. Bioinformatics, 2012, 28(23): 3150-3152.
2 问题分析
2.1 问题一
定义一个函数来比较两个字符串序列可以自己写for循环去比较也可以使用字符串比较工具SequenceMatcher。
2.2 问题二
DNA序列的聚类可以采用基于字符串相似度的聚类方法比如Levenshtein、SMITH-WATERMAN、N-gram方法、或基于序列编码如k-mer计数的机器学习聚类方法。
2.3 问题三
在问题二的基础上对train_reads.xt和test_reads文件和k-mer词频矩阵进行聚类分析以判断原始序列有多少条。统计每个簇中的序列数量得到拷贝数分布图。
2.4 问题四
1同簇的DNA序列比对方法对每个簇中的序列进行多数投票多数序列中出现的碱基将被选为最终序列的对应位置的碱基.
2对于每个聚类簇进行列方向的比对也就是对于序列的每个位置从属于该簇的所有序列中选取每个位置上最常出现的碱基作为该位置的最终碱基。
3对多数投票的结果进一步进行相似性评分比较每个簇的共识序列从投票中获得的序列与引用序列库理想的序列中的序列。
4对于找到的共识序列将其结果按照聚类簇的索引排序并输出以方便与目标序列文件(“test_ref.txt”)进行比对来确定错误位置和错误率。
5改进角度使用更加复杂的比对算法例如全局比对、局部比对算法、Smith-Waterman、Needleman-Wunsch算法这些算法考虑了插入、删除和替换并能够为每种类型的差错提供权重。
3 Python实现
3.1 问题一
import pandas as pd
from difflib import SequenceMatcher
from collections import Counter
from pyecharts.charts import Bar, Pie
from pyecharts import options as opts# 读取目标序列文件和测序序列文件
reference_seq_s pd.read_csv(data/train_reference.txt,sep ,names[ID,DNA_ref])
reads pd.read_csv(data/train_reads.txt,sep ,names[ID,DNA])
merged_df pd.merge(reference_seq_s, reads, onID, howinner)# 初始化统计变量
insertion_errors 0
deletion_errors 0
replacement_errors 0
chain_breaks 0
copy_numbers Counter()# 定义一个函数来比较两个序列并统计不同类型的错误
def analyze_sequence(ref_seq, test_seq):global insertion_errors, deletion_errors, replacement_errors, chain_breaks# 略for tag, i1, i2, j1, j2 in s.get_opcodes():if tag replace:replacement_errors max(i2 - i1, j2 - j1)elif tag delete:deletion_errors (i2 - i1)elif tag insert:insertion_errors (j2 - j1)elif tag equal:pass # No errorif len(ref_seq) ! len(test_seq):chain_breaks 1# 进行错误统计和拷贝数计算
for index, row in merged_df.iterrows():analyze_sequence(row[DNA_ref], row[DNA])copy_numbers[row[ID]] 1 # 总的测序次数
total_reads len(merged_df)# 绘制错误率和拷贝数统计图
def create_charts():# 错误率统计图error_bar (Bar(init_optsopts.InitOpts(width700px, height500px)).add_xaxis([Insertion, Deletion, Replacement, Chain Breaks]).add_yaxis(Errors, [insertion_errors, deletion_errors, replacement_errors, chain_breaks]).set_global_opts(title_optsopts.TitleOpts(titleDNA Sequence Errors)))# 拷贝数统计图copy_num_pie (Pie(init_optsopts.InitOpts(width700px, height500px)).add(,[list(z) for z in zip([str(id) for id in copy_numbers.keys()], copy_numbers.values())],radius[40%, 75%],).set_global_opts(title_optsopts.TitleOpts(titleDNA Sequence Copy Numbers),legend_optsopts.LegendOpts(orientvertical, pos_top15%, pos_left2%),).set_series_opts(label_optsopts.LabelOpts(formatter{b}: {c})))return error_bar, copy_num_pie# 创建和渲染图表
error_bar, copy_num_pie create_charts()
error_bar.render(breakdown_of_errors.html)
copy_num_pie.render(dna_copy_numbers.html) 3.2 问题二
方法一基于Levenshtein距离的聚类算法
import pandas as pd
from sklearn.cluster import AgglomerativeClustering
import Levenshtein
import time# 读取数据
reference_seq_s pd.read_csv(data/train_reference.txt, sep , names[ID, DNA_ref])
reads pd.read_csv(data/train_reads.txt, sep , names[ID, DNA])# 计算Levenshtein距离矩阵由于计算量大这里只示范计算前n个序列的距离矩阵
n len(reads)
distance_matrix [[0] * n for _ in range(n)]
for i in range(n):for j in range(i1, n):略。。。# 聚类
start_time time.time()
clustering_model AgglomerativeClustering(affinityprecomputed, linkagecomplete, n_clustersNone, distance_threshold1.0)
clustering_model.fit(distance_matrix)
duration time.time() - start_time# 评估聚类结果这里计算不同簇的数量
clusters clustering_model.labels_
cluster_counts pd.Series(clusters).value_counts()import numpy as np
import matplotlib.pyplot as plt
from scipy.cluster.hierarchy import dendrogram# 画出树状图
def plot_dendrogram(model, **kwargs):children model.children_distance np.arange(children.shape[0])no_of_observations np.arange(2, children.shape[0]2)linkage_matrix np.column_stack([children, distance, no_of_observations]).astype(float)dendrogram(linkage_matrix, **kwargs)plt.figure(figsize(15, 8))
plot_dendrogram(clustering_model, labelsrange(len(reads)))
plt.ylabel(Distance)
plt.savefig(img/层次聚类.png,dpi100)
plt.show() 方法二基于SMITH-WATERMAN算法的聚类 import pandas as pd
import numpy as np
from sklearn.cluster import AgglomerativeClustering
import matplotlib.pyplot as plt
import itertools
# from Bio import pairwise2# 数据读取
reference_seq_s pd.read_csv(data/train_reference.txt, sep , names[ID,DNA_ref])
reads pd.read_csv(data/train_reads.txt, sep , names[ID,DNA])# SMITH-WATERMAN算法的实现
def smith_waterman_alignment(s1, s2, match_score3, gap_cost2):# 初始化得分矩阵A np.zeros((len(s1) 1, len(s2) 1), int)for i, j in itertools.product(range(1, A.shape[0]), range(1, A.shape[1])):match A[i - 1, j - 1] (match_score if s1[i - 1] s2[j - 1] else -match_score)delete A[i - 1, j] - gap_costinsert A[i, j - 1] - gap_costA[i, j] max(match, delete, insert, 0)return np.max(A)# 编辑距离矩阵的计算
def compute_distance_matrix(reads):n_reads len(reads)distance_matrix np.zeros((n_reads, n_reads))for i in range(n_reads):for j in range(i1, n_reads):alignment_score smith_waterman_alignment(reads[i], reads[j])distance_matrix[i, j] distance_matrix[j, i] alignment_score # we use alignment score directly herereturn distance_matrix# Run SMITH-WATERMAN on the dataset
distance_matrix compute_distance_matrix(reads[DNA].values)# 聚类算法
def cluster_sequences(distance_matrix, n_clusters2):# 使用层次聚类可以使用其他聚类算法clustering AgglomerativeClustering(n_clustersn_clusters, affinityprecomputed, linkagecomplete)# 使用 1 减 距离矩阵是为了将距离转化为相似度clustering.fit(1 - distance_matrix)return clustering.labels_
# 聚类和评估
cluster_labels cluster_sequences(distance_matrix)
reads[Cluster] cluster_labels# 评估簇的纯度
def evaluate_cluster_purity(cluster_labels, actual_labels):contingency_table pd.crosstab(cluster_labels, actual_labels)purity np.sum(np.max(contingency_table, axis0)) / np.sum(contingency_table.sum())return purity# 可视化
def visualize_clustering(reads, cluster_labels):plt.figure(figsize(12, 8))colors [r, g, b, y, c, m]for i in np.unique(cluster_labels):plt.plot(reads[reads[Cluster] i][DNA].index, [i] * sum(reads[Cluster] i), x, colorcolors[i % len(colors)], labelfCluster {i})plt.title(Clustering of DNA sequences)plt.xlabel(Sequence Index)plt.ylabel(Cluster ID)plt.legend()plt.show()visualize_clustering(reads, cluster_labels)
方法三对测序序列进行k-mer编码。使用CountVectorizer把序列的k-mer列表转换成词频term frequency矩阵。使用K-means算法对k-mer词频矩阵进行聚类聚类数设置为原始序列数。
import pandas as pd
import numpy as np
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.feature_extraction.text import CountVectorizer
import matplotlib.pyplot as plt
import time# 读取数据
reference_seq_s pd.read_csv(data/train_reference.txt, sep , names[ID, DNA_ref])
reads pd.read_csv(data/train_reads.txt, sep , names[ID, DNA])# k-mer计数函数
def get_kmers(sequence, k3):return [sequence[x:xk] for x in range(len(sequence) 1 - k)]reads[kmers] reads[DNA].apply(lambda x: get_kmers(x))# 将k-mer列表转换为字符串以便进一步转换为向量
reads[kmers_str] reads[kmers].apply(lambda x: .join(x))# 使用CountVectorizer生成k-mer的词频矩阵
vectorizer CountVectorizer()
X vectorizer.fit_transform(reads[kmers_str])# PCA降维
pca PCA(n_components2)
X_reduced pca.fit_transform(X.toarray())# KMeans聚类
# 确定簇的数量为原始序列数
n_clusters len(reference_seq_s[ID].unique())
kmeans KMeans(n_clustersn_clusters)start_time time.time()# 训练模型
kmeans.fit(X)
end_time time.time()# 计算总耗时
total_time (end_time - start_time) / 60
print(聚类时间{:.2f} minutes..format(total_time))labels kmeans.labels_
reads[cluster] labels
# 可视化结果
plt.figure(figsize(8, 6))
plt.scatter(X_reduced[:, 0], X_reduced[:, 1], clabels, cmaprainbow, alpha0.6, edgecolorsw, s50)
plt.savefig(img/k-cluster.png,dpi100)
plt.show() 3.3 问题三
import pandas as pd
from sklearn.cluster import KMeans
from sklearn.feature_extraction.text import CountVectorizer
import pyecharts.options as opts
from pyecharts.charts import Bar
import time# k-mer计数函数
def get_kmers(sequence, k3):return [sequence[x:xk] for x in range(len(sequence) 1 - k)]
# 读取数据
# reference_seq_s pd.read_csv(data/train_reference.txt, sep , names[ID, DNA])
reads pd.read_csv(data/train_reads.txt, sep , names[ID, DNA])
test_reads pd.read_csv(data/test_reads.txt,names[DNA])reads[kmers] reads[DNA].apply(lambda x: get_kmers(x))
# 将k-mer列表转换为字符串以便进一步转换为向量
reads[kmers_str] reads[kmers].apply(lambda x: .join(x))
# 应用k-mer处理
test_reads[kmers] test_reads[DNA].apply(lambda x: get_kmers(x))
test_reads[kmers_str] test_reads[kmers].apply(lambda x: .join(x))# 使用CountVectorizer生成k-mer的词频矩阵
vectorizer CountVectorizer()
# 先拟合训练数据
X_train vectorizer.fit_transform(reads[kmers_str])
# 再转换测试数据
X_test vectorizer.transform(test_reads[kmers_str])from sklearn.decomposition import PCA
# 用PCA降维以便可视化仅用于降维和可视化并不用于聚类
pca PCA(n_components2)
X_reduced pca.fit_transform(X_train.toarray())
# KMeans聚类
start_time time.time()
n_clusters len(reads[ID].unique())
kmeans KMeans(n_clustersn_clusters)
# 训练模型
kmeans.fit(X_train)
clusters kmeans.fit_predict(X_test)
end_time time.time()
# 输出聚类耗时
print(fClustering Time: {end_time - start_time})# 统计每个簇的拷贝数
cluster_counts pd.Series(clusters).value_counts().sort_index()3.4 问题四
1方法一
from sklearn.decomposition import PCA
import pandas as pd
from sklearn.cluster import KMeans
from sklearn.feature_extraction.text import CountVectorizer
import time# k-mer计数函数
def get_kmers(sequence, k3):return [sequence[x:xk] for x in range(len(sequence) 1 - k)]
# 读取数据
# reference_seq_s pd.read_csv(data/train_reference.txt, sep , names[ID, DNA])
reads pd.read_csv(data/train_reads.txt, sep , names[ID, DNA])
test_reads pd.read_csv(data/test_reads.txt,names[DNA])reads[kmers] reads[DNA].apply(lambda x: get_kmers(x))
# 将k-mer列表转换为字符串以便进一步转换为向量
reads[kmers_str] reads[kmers].apply(lambda x: .join(x))
# 应用k-mer处理
test_reads[kmers] test_reads[DNA].apply(lambda x: get_kmers(x))
test_reads[kmers_str] test_reads[kmers].apply(lambda x: .join(x))
# 使用CountVectorizer生成k-mer的词频矩阵
vectorizer CountVectorizer()
# 先拟合训练数据
X_train vectorizer.fit_transform(reads[kmers_str])
# 再转换测试数据
X_test vectorizer.transform(test_reads[kmers_str])
# 用PCA降维以便可视化仅用于降维和可视化并不用于聚类
pca PCA(n_components2)
X_reduced pca.fit_transform(X_train.toarray())
# KMeans聚类
start_time time.time()
n_clusters len(reads[ID].unique())
kmeans KMeans(n_clustersn_clusters)
# 训练模型
kmeans.fit(X_train)
clusters kmeans.fit_predict(X_test)# 比对模型的Python代码实现
import numpy as np
from collections import Counter
from typing import List# 函数来计算多数投票后确定的序列
def consensus_sequence(seqs: List[str]) - str:采取多数投票法返回一个列表中最可能正确的目标序列。:param seqs: 需要进行多数投票的一系列序列:return: 最可能正确的目标序列# 将序列转置以方便进行列方向投票transposed_seqs list(zip(*seqs))consensus_seq []# 对于每个位置计算最常见的元素for column in transposed_seqs:counter Counter(column)most_common counter.most_common(1)[0][0]consensus_seq.append(most_common)return .join(consensus_seq)# 根据聚类结果对序列进行聚类
clustered_seqs {} # 存储每个原始序列ID对应的所有序列
# 对测试数据聚类
for idx, cluster_id in enumerate(clusters):if cluster_id not in clustered_seqs:clustered_seqs[cluster_id] []clustered_seqs[cluster_id].append(test_reads[DNA][idx])# 对于每个聚类进行比对并确定共识序列
consensus_seqs {}
for cluster_id, seqs in clustered_seqs.items():consensus consensus_sequence(seqs)consensus_seqs[cluster_id] consensus
# 评估聚类质量和恢复的序列质量
reference_seqs pd.read_csv(data/train_reference.txt, sep , names[ID, DNA])# 评估聚类质量和恢复的序列质量
reference_seqs pd.read_csv(data/train_reference.txt, sep , names[ID, DNA])
# 计算共识序列与目标序列的错误率
def calculate_error_rate(original_seq: str, new_seq: str) - float:计算恢复的序列与目标序列之间的错误率。:param original_seq: 原序列:param new_seq: 恢复的序列:return: 错误率errors sum(1 for orig, new in zip(original_seq, new_seq) if orig ! new)return errors / len(original_seq)# 错误率列表
error_rates []
# 输出最可能正确的序列并计算错误率
for cluster_id, cons_seq in sorted(consensus_seqs.items()):original_seq reference_seqs.loc[cluster_id,DNA]error_rate calculate_error_rate(original_seq, cons_seq)error_rates.append(error_rate)print(fCluster {cluster_id} Consensus: {cons_seq}, Error Rate: {error_rate})# 分析总体错误率
overall_error_rate np.mean(error_rates)
print(f总体错误率: {overall_error_rate})总体错误率0.509 2方法二
from sklearn.decomposition import PCA
import pandas as pd
from sklearn.cluster import KMeans
from sklearn.feature_extraction.text import CountVectorizer
import time# k-mer计数函数
def get_kmers(sequence, k3):return [sequence[x:xk] for x in range(len(sequence) 1 - k)]
# 读取数据
# reference_seq_s pd.read_csv(data/train_reference.txt, sep , names[ID, DNA])
reads pd.read_csv(data/train_reads.txt, sep , names[ID, DNA])
test_reads pd.read_csv(data/test_reads.txt,names[DNA])reads[kmers] reads[DNA].apply(lambda x: get_kmers(x))
# 将k-mer列表转换为字符串以便进一步转换为向量
reads[kmers_str] reads[kmers].apply(lambda x: .join(x))
# 应用k-mer处理
test_reads[kmers] test_reads[DNA].apply(lambda x: get_kmers(x))
test_reads[kmers_str] test_reads[kmers].apply(lambda x: .join(x))
# 使用CountVectorizer生成k-mer的词频矩阵
vectorizer CountVectorizer()
# 先拟合训练数据
X_train vectorizer.fit_transform(reads[kmers_str])
# 再转换测试数据
X_test vectorizer.transform(test_reads[kmers_str])
# 用PCA降维以便可视化仅用于降维和可视化并不用于聚类
pca PCA(n_components2)
X_reduced pca.fit_transform(X_train.toarray())
# KMeans聚类
start_time time.time()
n_clusters len(reads[ID].unique())
kmeans KMeans(n_clustersn_clusters)
# 训练模型
kmeans.fit(X_train)
clusters kmeans.fit_predict(X_test)import numpy as np
import pandas as pd
from collections import Counter# Needleman-Wunsch算法实现
def needleman_wunsch(seq1, seq2, match_score1, gap_cost1, mismatch_cost1):n len(seq1)m len(seq2)score_matrix np.zeros((n1, m1))# Initialize score matrix and traceback pathsfor i in range(n1):score_matrix[i][0] -i * gap_costfor j in range(m1):score_matrix[0][j] -j * gap_cost# Fill in score matrixfor i in range(1, n1):for j in range(1, m1):if seq1[i-1] seq2[j-1]:match score_matrix[i-1][j-1] match_scoreelse:match score_matrix[i-1][j-1] - mismatch_costdelete score_matrix[i-1][j] - gap_costinsert score_matrix[i][j-1] - gap_costscore_matrix[i][j] max(match, delete, insert)# Traceback to compute the alignmentalign1 align2 i nj mwhile i 0 and j 0:score_current score_matrix[i][j]score_diagonal score_matrix[i-1][j-1]score_up score_matrix[i][j-1]score_left score_matrix[i-1][j]if score_current score_diagonal (match_score if seq1[i-1] seq2[j-1] else -mismatch_cost):align1 seq1[i-1]align2 seq2[j-1]i - 1j - 1elif score_current score_left - gap_cost:align1 seq1[i-1]align2 -i - 1elif score_current score_up - gap_cost:align1 -align2 seq2[j-1]j - 1while i 0:align1 seq1[i-1]align2 -i - 1while j 0:align1 -align2 seq2[j-1]j - 1return align1[::-1], align2[::-1]# 从聚类结果中恢复出最可能的序列
def recover_sequence(cluster_seqs):# 序列长度可能不同先找到最长的序列长度略return consensus_sequencefrom functools import reduce
# 使用先前完成的KMeans结果clusters
# 假设clusters为序列的聚类结果test_reads为相应的序列数据
cluster_dict {i: [] for i in range(n_clusters)}
for i, cluster in enumerate(clusters):cluster_dict[cluster].append(test_reads[DNA][i])# 对每个簇进行比对并且输出最可能正确的序列
consensus_sequences {}
for cluster_id, seqs in cluster_dict.items():if len(seqs) 1:# 使用reduce函数将同簇序列两两比对consensus reduce(lambda x, y: recover_sequence([x, y]), seqs)else:# 如果簇内只有一个序列则将其作为最可能的序列consensus seqs[0]consensus_sequences[cluster_id] consensus# 将得到的“最可能正确的序列”写入到文件
with open(data/test_ref.txt, w) as f_out:for seq in consensus_sequences.values():f_out.write(seq \n) 4 完整代码
请看名片扣我