65.9K
CodeProject 正在变化。 阅读更多。
Home

破解 Alexa 的语音录音

starIconstarIconstarIconstarIconstarIcon

5.00/5 (14投票s)

2019年11月7日

CPOL

4分钟阅读

viewsIcon

49541

downloadIcon

558

现在您可以存储亚马逊保留但未向客户提供的自己的语音录音。

引言

我拥有两个亚马逊 Echo 设备(其中一个给了我正在国外上大学的女儿)。亚马逊最近 确认,亚马逊 Alexa 智能助手的用户产生的语音录音将被永久保存,除非用户手动删除。在深入研究了这个问题后,我试图找到一种下载我数据的方法。向亚马逊提交正式请求后,我收到一封“批准”我的请求的电子邮件,但我的任何录音都没有包含在数据中……在咨询客服后,我被告知只能收听或删除我的录音,但没有下载的选项。换句话说,如果您使用亚马逊 Alexa 设备,亚马逊会保存您所有的录音文件,但您无法获取它们。好了,现在您可以通过我们开发的 Python 脚本来做到了,这个脚本就能实现这一点。

背景

如果您有 Alexa 设备,只需访问 https://alexa.amazon.com/,然后点击设置,再点击历史记录

您将能够查看您与 Alexa 的每一次互动。这包括不成功的互动,例如 Alexa 未能理解您,或者只是录制了您不打算对她说的私人对话(这种情况时有发生)。这些条目可以展开,在大多数情况下,它们会包含一个小的“播放”图标,您可以点击它来收听对话。没有下载这些录音的选项。但是,您可以删除它们。

我并不想删除它们,因为我认为能够收听 Alexa 监听和记录(几乎)所有内容时的各种对话非常有趣。让我烦恼的是我无法下载这些录音。我们开发的 Python 脚本可以完成这项工作,同时根据日期和时间以及对话标题为每个录音提供一个逻辑性的文件名。

这里是它的样子。

准备 Python 环境

其余过程的准备工作需要安装 Python 和几个库。

下载和安装 Python

  1. 请使用以下 链接 进行下载。
  2. 安装完成后,将安装位置的路径添加到 PATH 环境变量中。

    默认位置将是

    • C:\Users\<您的用户名>\AppData\Roaming\Microsoft\Windows\Start Menu\Programs\Python 3.7-32

    您可以使用以下命令将此条目添加到 PATH

    set path "%path%;c:\Users\<YOUR USER NAME>\AppData\Local\Programs\Python\Python37-32"
  3. 打开命令提示符 (CMD) 并输入以下行
    python -m pip install selenium pygithub requests webdriver_manager

    您可能会收到以下警告。为确保我们的脚本能够顺利运行,请添加以下条目

    setx path "%path%;c:\Users\<YOUR USER NAME>\
    AppData\Local\Programs\Python\Python37-32\Scripts"

这将安装以下扩展

credentials.py 文件

我们使用一个单独的文件,您可以在其中输入您的亚马逊凭据,以便脚本可以自动登录您的帐户。

class Credentials:
    email = '*****'
    password = '******'

运行脚本

类型

python alexa.py

工作原理

脚本如下

登录 Alexa

以下函数用于通过您的亚马逊帐户登录您的 Alexa 历史记录

def amazon_login(driver, date_from, date_to):
    driver.implicitly_wait(5)
    logger.info("GET https://alexa.amazon.com/spa/index.html")
    # get main page
    driver.get('https://alexa.amazon.com/spa/index.html')
    sleep(4)
    url = driver.current_url
    # if amazon asks for signin, it will redirect to a page with signin in url
    if 'signin' in url:
        logger.info("Got login page: logging in...")
        # find email field
        # WebDriverWait waits until elements appear on the page
        # so it prevents script from failing in case page is still being loaded
        # Also if script fails to find the elements (which should not happen
        # but happens if your internet connection fails)
        # it is possible to catch TimeOutError and loop the script, so it will
        # repeat.
        check_field = WebDriverWait(driver, 30).until(
                EC.presence_of_element_located((By.ID, 'ap_email')))
        email_field = driver.find_element_by_id('ap_email')
        email_field.clear()
        # type email
        email_field.send_keys(Credentials.email)
        check_field = WebDriverWait(driver, 30).until(
                EC.presence_of_element_located((By.ID, 'ap_password')))
        # find password field
        password_field = driver.find_element_by_id('ap_password')
        password_field.clear()
        # type password
        password_field.send_keys(Credentials.password)
        # find submit button, submit
        check_field = WebDriverWait(driver, 30).until(
                EC.presence_of_element_located((By.ID, 'signInSubmit')))
        submit = driver.find_element_by_id('signInSubmit')
        submit.click()
    # get history page
    driver.get('https://www.amazon.com/hz/mycd/myx#/home/alexaPrivacy/'
               'activityHistory&all')
    sleep(4)
    # amazon can give second auth page, so repeat the same as above
    if 'signin' in driver.current_url:
        logger.info("Got confirmation login page: logging in...")
        try:
            check_field = WebDriverWait(driver, 30).until(
                    EC.presence_of_element_located((By.ID, 'ap_email')))
            email_field = driver.find_element_by_id('ap_email')
            email_field.clear()
            email_field.send_keys(Credentials.email)
            check_field = WebDriverWait(driver, 30).until(
                    EC.presence_of_element_located((By.ID, 'continue')))
            submit = driver.find_element_by_id('continue')
            submit.click()
            sleep(1)
        except:
            pass
        check_field = WebDriverWait(driver, 30).until(
                EC.presence_of_element_located((By.ID, 'ap_password')))
        password_field = driver.find_element_by_id('ap_password')
        password_field.clear()
        password_field.send_keys(Credentials.password)
        check_field = WebDriverWait(driver, 30).until(
                EC.presence_of_element_located((By.ID, 'signInSubmit')))
        submit = driver.find_element_by_id('signInSubmit')
        submit.click()
        sleep(3)
        logger.info("GET https://www.amazon.com/hz/mycd/myx#/home/alexaPrivacy/"
                   "activityHistory&all")
        # get history page again
        driver.get('https://www.amazon.com/hz/mycd/myx#/home/alexaPrivacy/'
                   'activityHistory&all')
    # find selector which allows to select Date Range 
    check = WebDriverWait(driver, 30).until(
            EC.presence_of_element_located(
                (By.CLASS_NAME, "a-dropdown-prompt")))
    history = driver.find_elements_by_class_name('a-dropdown-prompt')
    history[0].click()
    check = WebDriverWait(driver, 30).until(
            EC.presence_of_element_located(
                (By.CLASS_NAME, "a-dropdown-link")))
    # click 'All History'
    all_hist = driver.find_elements_by_class_name('a-dropdown-link')
    for link in all_hist:
        if date_from and date_to:
            if 'Custom' in link.text:
                link.click()
                from_d = driver.find_element_by_id('startDateId')
                from_d.clear()
                from_d.send_keys('11/03/2019')
                sleep(1)
                to_d = driver.find_element_by_id('endDateId')
                to_d.clear()
                to_d.send_keys('11/05/2019')
                subm = driver.find_element_by_id('submit')
                subm.click()
        elif 'All' in link.text:
            link.click()

启用下载

以下函数启用下载

def enable_downloads(driver, download_dir):
    driver.command_executor._commands["send_command"] = (
        "POST", '/session/$sessionId/chromium/send_command')
    params = {'cmd': 'Page.setDownloadBehavior', 
    'params': {'behavior': 'allow', 'downloadPath': download_dir}}
    command_result = driver.execute("send_command", params)

初始化驱动程序

以下函数初始化 Chrome 驱动程序。

def init_driver():
    logger.info("Starting chromedriver")
    chrome_options = Options()
    # use local data directory
    # headless mode can't be enabled since then amazon shows captcha
    chrome_options.add_argument("user-data-dir=selenium") 
    chrome_options.add_argument("start-maximized")
    chrome_options.add_argument("--disable-infobars")
    chrome_options.add_argument('--disable-gpu')  
    chrome_options.add_argument('--remote-debugging-port=4444')
    chrome_options.add_argument('--no-sandbox')
    chrome_options.add_argument('--disable-dev-shm-usage')
    chrome_options.add_argument("--mute-audio")
    path = os.path.dirname(os.path.realpath(__file__))
    if not os.path.isdir(os.path.join(path, 'audios')):
        os.mkdir(os.path.join(path, 'audios'))
    chrome_options.add_experimental_option("prefs", {
        "download.default_directory": os.path.join(path, 'audios'),
        "download.prompt_for_download": False,
        "download.directory_upgrade": True,
        "safebrowsing.enabled": True
    })
    try:
        driver = webdriver.Chrome(
            executable_path=ChromeDriverManager().install(), 
            options=chrome_options, service_log_path='NUL')
    except ValueError:
        logger.critical("Error opening Chrome. Chrome is not installed?")
        exit(1)
    driver.implicitly_wait(10)
    # set downloads directory to audios folder
    enable_downloads(driver, os.path.join(path, 'audios'))
    return driver 

下载页面内容

对于每个页面,我们都会获取所有录音并下载它们。由于没有直接下载这些录音的方法,只能播放它们,所以我们在这里进行了一些“黑客”操作……

我们基本上提取了一个ID 属性,该属性随后成为下载链接的一部分。

ID 属性大致看起来像这样(可能会有所不同)

audio-Vox:1.0/2019/10/27/21/1d2110cb8eb54f3cb6 

在此示例中,2019/10/27/21 是日期戳,整个 ID 被添加到用于下载此特定音频录音的链接中。

我们还使用存储在类为 summaryCss 的元素中的附加信息。

如果没有附加信息,则该元素将命名为“无法理解音频”。

def parse_page(driver):
    links = []
    # links will contain all links harvested from one page
    check = WebDriverWait(driver, 30).until(EC.presence_of_element_located(
                                                   (By.CLASS_NAME, "mainBox")))
    boxes = driver.find_elements_by_class_name('mainBox')
    # mainBox corresponds to each element with audio recording
    for box in boxes:
        # if there is no voice, element can be detected by its class and skipped
        non_voice = box.find_elements_by_class_name('nonVoiceUtteranceMessage')
        if non_voice:
            logger.info('Non-voice file. Skipped.')
            continue
        non_text = box.find_elements_by_class_name('textInfo')
        if non_text:
            if 'No text stored' in non_text[0].text:
                logger.info("Non-voice file. Skipped.")
                continue
        # else we can find audio element and extract its data
        check = WebDriverWait(driver, 30).until(EC.presence_of_element_located(
                                                       (By.TAG_NAME, "audio")))
        audio_el = box.find_elements_by_tag_name('audio')
        for audio in audio_el:
            try:
                attr = audio.get_attribute('id')
                # we extract ID attribute which then becomes a part of the link.
                # ID approximately looks like this (can vary):
                # audio-Vox:1.0/2019/10/27/21/1d2110cb8eb54f3cb6
                # here 2019/10/27/21 is the date, and the whole ID is being
                # added to the link to download said audio recording.

                # Additional info is stored in element with class summaryCss.
                # If there is no additional info then the element will be named
                # as 'audio could not be understood'.
                get_name = box.find_elements_by_class_name('summaryCss')
                if not get_name:
                    get_name = 'Audio could not be understood'
                else:
                    get_name = get_name[0].text
                # subInfo element contains date and device data which we extract
                check = WebDriverWait(driver, 30).until(
                    EC.presence_of_element_located((By.CLASS_NAME, "subInfo")))
                subinfo = box.find_elements_by_class_name('subInfo')
                time = subinfo[0].text
                # extracting date from ID attribute, since it is easier.
                get_date = re.findall(r'\/(\d+\/\d+\/\d+\/\d+)\/', attr)
                try:
                    # replace slashes to -.
                    get_date = get_date[0].strip().replace('/', '-')
                except IndexError:
                    try:
                        # in case there is no date in the attribute
                        # (which should not happen anymore)
                        # we extract date from subInfo element and turn it
                        # into normal, easy for sorting date, e.g 2019/10/11.
                        get_date = re.findall(
                            r'On\s(.*?)\s(\d{1,2})\,\s(\d{4})', time)
                        month = get_date[0][0]
                        new = month[0].upper() + month[1:3].lower()
                        month = strptime(new,'%b').tm_mon
                        get_date = f"{get_date[0][2]}-{month}-{get_date[0][1]}"
                    except IndexError:
                        get_date = re.findall(r'(.*?)\sat', time)
                        day = get_date[0]
                        if 'Yesterday' in day:
                            day = datetime.now() - timedelta(days=1)
                            day = str(day.day)
                        elif 'Today' in day:
                            day = str(datetime.now().day)
                        day = day if len(day) == 2 else '0'+day
                        curr_month = str(datetime.now().month)
                        curr_month = curr_month if len(
                                            curr_month) == 2 else '0'+curr_month
                        curr_year = datetime.now().year
                        get_date = f"{curr_year}-{curr_month}-{day}"
                # Extract exact time and device
                find_p0 = time.find('at')
                find_p1 = time.find('on')
                get_time = time[find_p0+2:find_p1-1].replace(':', '-')
                device = time[find_p1:]
                get_name = get_name
                # Form element name
                name = f"{get_date} {get_time} {get_name} {device}"
                # Strip all dangerous symbols from the name.
                # Dangerous symbols are symbols which Windows can not accept
                name = re.sub(r'[^\w\-\(\) ]+', '', name)
                # Allow maximum 3 duplicates
                # if there is such element already, 
                # (1)+n will be added to its name.
                for link in links:
                    if name == link[1]:
                        name += ' (1)'
                        break
                dup = 1
                while dup <= 3:
                    for link in links:
                        if name == link[1]:
                            name = name.replace(f"({dup})", f"({dup+1})")
                    dup += 1
                print("_"*80)
                logger.info(f"Found: {attr}\n{name}")
                # check if recording already exists on the disk
                if not os.path.isfile(os.path.join('audios', name+'.wav')):
                    if not '/' in attr:
                        # if ID is incorrect at all, we play the file
                        # and try to extract the link generated by amazon itself
                        logger.info(
                            "ID attribute was not found. Playing the file.")
                        play_icon = box.find_elements_by_class_name(
                                                                   'playButton')
                        get_onclick = play_icon[0].get_attribute('onclick')
                        driver.execute_script(get_onclick)
                        sleep(8)
                        get_source = box.find_elements_by_tag_name('source')
                        src = get_source[0].get_attribute('src')
                        # if we had success, link is appended to links
                        if 'https' in src:
                            links.append([src, name])
                        else:
                            logger.critical(
                                   "Link was not found after playing the file. "
                                   "Item skipped.")
                    else:
                        # If audio ID is valid, we replace audio with id
                        # and append it to the link.
                        # From now we can download it.
                        if attr.replace('audio-', ''):
                            attr = attr.replace('audio-', 'id=')
                            links.append([
                            'https://www.amazon.com/hz/mycd/playOption?'+attr,
                            name])
                else:
                    logger.info(f"File exists; passing: {name}.wav")
            except Exception:
                logger.critical(traceback.format_exc())
                logger.critical("Item failed; passing")
                continue
    return links 

我们的主函数

我们的 Main 函数根据 Credentials 类连接到亚马逊帐户并转到 Alexa 的历史记录。然后,它创建一个广泛的查询,涵盖从第一天到现在的整个历史记录。然后,它模拟人类在播放每条录音(亚马逊允许这样做)时的操作,但是,之后,它会定位用于播放此音频的音频文件并将其下载。

def main():
    ap = ArgumentParser()
    ap.add_argument(
        "-f", "--date_from", required=False, 
        help=("Seek starting from date MM/DD/YYYY.")
    )
    ap.add_argument(
        "-t", "--date_to", required=False,
        help=("Seek until date MM/DD/YYYY.")
    )
    args = vars(ap.parse_args())
    if args["date_from"] and not args["date_to"]:
        args["date_to"] = str(datetime.now().month) +'/'+ str(datetime.now(
                                        ).day) +'/'+ str(datetime.now().year)
    if args["date_to"] and not args["date_from"]:
        logger.critical("You haven't specified beginning date. Use -f option.")
        exit(1)

    sys_sleep = None
    sys_sleep = WindowsInhibitor()
    logger.info("System inhibited.")
    sys_sleep.inhibit()
    
    # start chromedriver
    driver = init_driver()

    while True:
        try:
            # login
            amazon_login(driver, args["date_from"], args["date_to"])
            break
        except TimeoutException:
            # catch broken connection
            logger.critical("Timeout exception. No internet connection? "
                            "Retrying...")
            sleep(10)
            continue

    # after few attempts will reset the page
    failed_page_attempt = 0
    while True:
        logger.info("Parsing links...")
        driver.implicitly_wait(2)

        try:
            # parse current page for audios
            links = parse_page(driver)
            # reset fail counter on each success
            failed_page_attempt = 0
        except TimeoutException:
            # catch broken connection
            logger.critical(traceback.format_exc())
            if failed_page_attempt <= 3:
                logger.critical("No Internet connection? Retrying...")
                logger.critical(f"Attempt #{failed_page_attempt}/3")
                sleep(5)
                failed_page_attempt += 1
                continue
            else:
                failed_page_attempt = 0
                logger.critical("Trying to re-render page...")
                driver.execute_script('getPreviousPageItems()')
                sleep(5)
                driver.execute_script('getNextPageItems()')
                continue

        logger.info(f"Total files to download: {len(links)}")

        for item in links:
            # download parsed items
            fetch(driver, item)

        # find the 'Next' button, which moves to the next page.
        failed_button_attempt = 0
        while True:
            try:
                check_btn = WebDriverWait(driver, 30).until(
                        EC.presence_of_element_located((By.ID, 'nextButton')))
                failed_button_attempt = 0
                break
            except TimeoutException:
                if failed_button_attempt <= 3:
                    logger.critical(
                            "Timeout exception: next button was not found. "
                            "No Internet connection? Waiting and retrying...")
                    logger.critical(f"Attempt #{failed_button_attempt}/3")
                    sleep(10)
                    failed_button_attempt += 1
                    continue
                else:
                    failed_button_attempt = 0
                    logger.critical("Trying to re-render page...")
                    driver.execute_script('getPreviousPageItems()')
                    sleep(5)
                    driver.execute_script('getNextPageItems()')
                    continue
        nextbtn = driver.find_element_by_id('nextButton').get_attribute('class')
        if 'navigationAvailable' in nextbtn:
            # if button is active, click it.
            driver.implicitly_wait(10)
            while True:
                try:
                    logger.info("Next page...")
                    driver.find_element_by_id('nextButton').click()
                    break
                except:
                    logger.critical("Unable to click the next button. "
                                    "Waiting and retrying...")
                    sleep(10)
                    continue
            continue
        else:
            # if button is inactive, this means it is the last page.
            # script is done here.
            break
    driver.close()
    driver.quit()
    if args['date_from']:
        logger.info('All done. Press Enter to exit.')
        i = input()
    else:
        logger.info("All done. Exit.")
    logger.info("System uninhibited.")
    sys_sleep.uninhibit()

获取选定的日期范围

也可以从给定的日期范围获取录音

为此,请使用 -f-t 选项,它们分别指定开始日期结束日期,例如:

python alexa.py -f 11/03/2019 -t 11/05/2019

关注点

有时,亚马逊可能会在识别到批量下载后阻止活动,在这种情况下,我们的脚本会等待然后恢复。这是执行此操作的代码。我们所做的是检查目标文件是否有效,如果无效(如果其大小为 0 字节),我们则重试。

                if os.path.isfile(os.path.join('audios', name+'.wav')):
                    if os.stat(os.path.join('audios', name+'.wav')).st_size == 0:
                        logger.info("File size is 0. Retrying.")
                        sleep(3)
                        continue

毕竟,如果亚马逊存储了我们的个人录音,为什么我们不能呢?

历史

  • 2019年11月7日:初始版本
© . All rights reserved.