"""
* Created with PyCharm
* 作者: 阿光
* 日期: 2022/1/14
* 时间: 16:15
* 描述:
"""
import random
import re
import tarfile
import numpy as np
import requests
def download():
corpus_url = "https://dataset.bj.bcebos.com/imdb%2FaclImdb_v1.tar.gz"
web_request = requests.get(corpus_url)
corpus = web_request.content
with open("./aclImdb_v1.tar.gz", "wb") as f:
f.write(corpus)
f.close()
# download()
def load_imdb(is_training):
data_set = []
for label in ["pos", "neg"]:
with tarfile.open("./aclImdb_v1.tar.gz") as tarf:
path_pattern = "aclImdb/train/" + label + "/.*\.txt$" if is_training \
else "aclImdb/test/" + label + "/.*\.txt$"
path_pattern = re.compile(path_pattern)
tf = tarf.next()
while tf != None:
if bool(path_pattern.match(tf.name)):
sentence = tarf.extractfile(tf).read().decode()
sentence_label = 0 if label == 'neg' else 1
data_set.append((sentence, sentence_label))
tf = tarf.next()
return data_set
def data_preprocess(corpus):
data_set = []
for sentence, sentence_label in corpus:
sentence = sentence.strip().lower()
sentence = sentence.split(" ")
data_set.append((sentence, sentence_label))
return data_set
# 构造词典,统计每个词的频率,并根据频率将每个词转换为一个整数id
def build_dict(corpus):
word_freq_dict = dict()
for sentence, _ in corpus:
for word in sentence:
if word not in word_freq_dict:
word_freq_dict[word] = 0
word_freq_dict[word] += 1
word_freq_dict = sorted(word_freq_dict.items(), key=lambda x: x[1], reverse=True)
word2id_dict = dict()
word2id_freq = dict()
word2id_dict['[oov]'] = 0
word2id_freq[0] = 1e10
word2id_dict['[pad]'] = 1
word2id_freq[1] = 1e10
for word, freq in word_freq_dict:
word2id_dict[word] = len(word2id_dict)
word2id_freq[word2id_dict[word]] = freq
return word2id_freq, word2id_dict
# 把语料转换为id序列
def convert_corpus_to_id(corpus, word2id_dict):
data_set = []
for sentence, sentence_label in corpus:
sentence = [word2id_dict[word] if word in word2id_dict \
else word2id_dict['[oov]'] for word in sentence]
data_set.append((sentence, sentence_label))
return data_set
# 编写一个迭代器,每次调用这个迭代器都会返回一个新的batch,用于训练或者预测
def build_batch(word2id_dict, corpus, batch_size, epoch_num, max_seq_len, shuffle=True):
sentence_batch = []
sentence_label_batch = []
for _ in range(epoch_num):
if shuffle:
random.shuffle(corpus)
for sentence, sentence_label in corpus:
sentence_sample = sentence[:min(max_seq_len, len(sentence))]
if len(sentence_sample) < max_seq_len:
for _ in range(max_seq_len - len(sentence_sample)):
sentence_sample.append(word2id_dict['[pad]'])
sentence_batch.append(sentence_sample)
sentence_label_batch.append([sentence_label])
if len(sentence_batch) == batch_size:
yield np.array(sentence_batch).astype("int64"), np.array(sentence_label_batch).astype("int64")
sentence_batch = []
sentence_label_batch = []
if len(sentence_batch) == batch_size:
yield np.array(sentence_batch).astype("int64"), np.array(sentence_label_batch).astype("int64")
def get_data():
train_corpus = load_imdb(True)
test_corpus = load_imdb(False)
train_corpus = data_preprocess(train_corpus)
test_corpus = data_preprocess(test_corpus)
word2id_freq, word2id_dict = build_dict(train_corpus)
vocab_size = len(word2id_freq)
train_corpus = convert_corpus_to_id(train_corpus, word2id_dict)
test_corpus = convert_corpus_to_id(test_corpus, word2id_dict)
train_datasets = build_batch(word2id_dict,
train_corpus[:1000], batch_size=64, epoch_num=64, max_seq_len=30)
return train_datasets
评论0