websitelytics

Menu

Стриминг данных в BigQuery

Опубликовано: 02 янв 2019

Стриминг данных в BigQuery может быть полезен для решения различных аналитических задач. Среди типичных кейсов подобных задач можно перечислить:

Стриминг и сохранение данных в таблице BigQuery осуществляется с помощью двух составляющих. Первая, которую условно назовем "серверная часть", это приложение в Google Cloud Platform , которое может обрабатывать поступающие запросы и сохранять данные в таблице в BigQuery. Приложение может работать на Python, Node.js или любом другом языке поддерживаемом GCP.

Вторая составляющая - это "клиентская часть", javascript сниппет, который отправляет данные через GET или POST запрос (хит) из клиента (браузера) в endpoint нашего приложения в GСP. В интернете имеется ряд бесплатных решений, реализующих в той или иной степени данный процесс. Здесь в качестве основы будет рассмотрен следующий вариант, реализованный на Python: Google Analytics -> BigQuery streaming

Данное решение было протестировано мной на сайтах со средним объемом трафика. Следует отметить, что одно из существенных ограничений данного кода Python, как он есть — это умение обрабатывать только GET запросы. К примеру Google Analytics отправляет данные на свой сервер как через GET, так и POST запросы, поэтому javascript сниппет стриминга должен уметь преобразовывать все запросы отправляемые в BigQuery в GET запросы в случае стриминга данных аналитики.

Шаги реализации

A. Регистрируемся в Google Cloud Platform. Все сервисы в GCP можно попробовать бесплатно, но к аккаунту нужно будет привязать конкретную платежную карту, в частности это необходимо для использования BigQuery.

blog

B. Создаем в GCP новый проект, в котором будет реализовано приложение (app), которое принимает, обрабатывает и сохраняет данные стриминга в таблицу BigQuery.

blog

C. Переходим в BigQuery и создаем новый Dataset в нашем новом проекте, в котором затем программно будет создана таблица для хранения данных стриминга, оставив опцию Data expiration по умолчанию равной Never.

blog

D. Разворачиваем наше приложение на Python в GCP:

1. Сохраняем код из репозитория Github в локальную директорию
2. Устанавливаем Python версии 2.7 на локальную машину, при этом в случае Windows при установке указываем опцию "добавить переменную Path".

Данная переменная будет нужна для работы Google Cloud SDK Shell.

3. Устанавливаем Google Cloud SDK Shell, с помощью которой мы будем разворачивать наше приложение в GCP.

Если Python уже установлен, при установке SDK Shell убираем галочку напротив Bundled Python.

4. Редактируем файлы приложения Python:

Просматриваем файл настроек приложения app.yaml и при желании редактируем его, например, можно изменить URL нашего приложения:

runtime: python27
api_version: 1
threadsafe: yes

handlers:
- url: /tasks/process_queue
  script: process_queue.app
  login: admin
- url: /tasks/create_bq_table
  script: create_bq_table.app
  login: admin
- url: /t
  script: main.app

libraries:
- name: webapp2
  version: "2.5.2"

Редактируем файл bqloader.py , указывая названия нашего проекта, имя dataset в bigquery и имя таблицы, которая будет создана.

При необходимости можно отредактировать схему данных (table_schema). В оригинальном коде, к примеру, нет параметров расширенной электронной торговли GA.

from oauth2client.appengine import AppAssertionCredentials
import httplib2
from apiclient import discovery
import logging

class BQLoader():
    project_id = 'yourprojectid'
    dataset_id = 'yourdataset'
    table_id = 'hits'

    table_schema= [
        {
            "name": "clientId",
            "type": "STRING"
        },       
        {
            "name": "type",
            "type": "STRING"
        },
        {
            "name": "timestamp",
            "type": "INTEGER"
        },

Если ваши навыки Python позволяют, есть смысл дописать код логирования событий ошибок, как при обработке данных, так и ошибок вставок inserts BigQuery streaming API.

Другой момент, на который следует обратить внимание - вы должны понимать, как BigQuery старается убирать дубликаты вставок, которые могут возникать при стриминге, а именно, как сформировать и использовать уникальный insertId для метода insertAll.

Подробнее об этом можно прочесть в официальных доках:
- Streaming data into BigQuery
- Troubleshooting streaming inserts
- Method: tabledata.insertAll

В случае внесения изменений в схему таблицы данных или URL проекта редактируем файл main.py, правя соответствующую логику:

#!/usr/bin/env python

import webapp2
import json
import string
import base64
from google.appengine.api import taskqueue
from urlparse import urlparse
from urlparse import parse_qs
import logging
import time
import re

class MainHandler(webapp2.RequestHandler):
    params_mapping = {
        'cid': 'clientId',
        'uid': 'userId',
        't': 'type',
        'tid': 'tid',
        'dr': 'fullReferrer',
        'sr': 'device.resolution',
        'vp': 'device.viewport',
        'sd': 'device.colors',
        'ul': 'device.language',
        'dl': 'page.location',
        'dh': 'page.hostname',
        'dp': 'page.path',
        'dt': 'page.title',
        'ec': 'event.category',
        'ea': 'event.action',
        'el': 'event.label',
        'ev': 'event.value',
        'ni': 'event.nonint',
        'ti': 'transaction.id',
        'ta': 'transaction.affiliation',
        'tr': 'transaction.revenue',
        'tcc': 'transaction.coupon',
        'cu': 'transaction.currency',
        'pa': 'funnel.action',
        'pal': 'funnel.list',
        'cos': 'funnel.checkoutStep',
        'col': 'funnel.checkoutOption'
    }

    def process_ga_params(self, params):
        data = {}

        parsed_url = urlparse(params['dl'])


        * * * * * * * *

        return data



app = webapp2.WSGIApplication([
    ('/t', MainHandler)
], debug=True)
5. Непосредственно разворачиваем проект с помощью команд в терминале SDK Shell
cd C:\Users\ys\Desktop\gabq-master
gcloud auth login

Посмотреть авторизованных пользователей можно с помощью команды:

gcloud auth list

При необходимости выбираем нужный аккаунт из списка:

gcloud config set account ACCOUNT

Указываем в конфигурации название нашего проекта:

gcloud config set project yourprojectname

Загружаем проект на GCP:

gcloud app deploy

Регион для приложения можно указать тот же, что и регион выбранный ранее для проекта. Если загрузка прошла успешно, в результате вы увидите сообщение, подтверждающее загрузку файлов: blog

gcloud app deploy cron.yaml
gcloud app deploy queue.yaml
gcloud app deploy index.yaml 

Файл app.yaml загружается автоматически при разворачивании проекта
Можно закрыть Google Cloud SDK Shell - проект загружен.

6. Создаем таблицу

Для работы данное app вначале создает новую таблицу в BigQuery. Для этого нужно открыть в браузере следующий URL -
https://[your-app].appspot.com/tasks/create_bq_table
Подтверждением успешно созданной таблицы будет сообщение OK в браузере. и естественно после этого новая таблица будет доступна в Bigquery.

После этого приложение в GCP может принимать данные стриминга от клиентской части, отправляемые с помощью javascript сниппета. Данные буферизируются, поэтому непосредственно в таблице их можно будет увидеть с небольшой задержкой.

В GCP вы обновляете проект в случае необходимости посредством загрузки новой версии вашего приложения. Чтобы GCP не разделяла трафик между старой и новой версией приложения, можно указать соответствующие опции в команде загрузки:

gcloud app deploy --promote --stop-previous-version

Обновлять файлы конфигурации можно без загрузки нововй версии. Например если вам нужно обновить конфигурвцию CRON, достаточно воспользоваться командой:

gcloud app deploy cron.yaml


Особые кейсы

Непредвиденный всплеск трафика

Нужно помнить о том, что в GAE вы платите по принципу pay as you go, при этом ресурсы, выделяемые вашему приложению, будут масштабировать автоматически при увеличении трафика и объема обрабатываемых данных. Большой всплеск трафика может заметно отразиться на сумме к оплате в вашем биллинг-аккаунте.

Избежать такой непредвиденной траты можно и нужно с помощью установок лимита расходов для вашего приложения в GAE. Установить лимит можно, например, на странице главного дашборда App Engine в блоке Billing status, установив сумму для Daily spending limit.

Понятно, что по достижению лимита приложение перестанет обрабатывать ваш трафик, работа возобновиться только на следующий день. Это пожалуй основной минус такого облачного решения.

Мне однажды пришлось столкнуться с такой ситуацией. Причиной резкого всплеска, было то, что в аналитику и соответственно в BigQuery отправлялись хиты о javascript ошибках на сайте. В какой-то момент один блок неисправного кода начал генерировать ошибку рекурсивно в большом объеме, спамя javascript console и создавая большой множество хитов в GA и BQ.

Картина в логах работы приложения была приблизительно такая: blog

Несмотря на то, что отслеживание js-ошибок в GTM было отключено в течение суток, за счет того, что страница сайта закешировалась, такой спамовый трафик, как вы видите на графике, продолжался еще несколько дней. И тут приходится выбирать, либо все же позволить себе лишние траты, но сохранить все поступающие данные, либо оперативно предпринять меры по снижению роста расходов, что к сожалению скорее всего приведет к остановкам работы приложения по достижению соответствующих лимитив квот.

Если вы выбираете второе, вот что можно сделать:

  1. Если после устранения ошибок на сайте, вы по прежнему видите большой трафик спама с закешированных страниц, можно изменить точку логирования данных. Это нужно сделать в двух местах непосредственно в самом приложении — в файле app.yaml и в файле main.py (и загрузить новую версию), а также везде, где нужно в коде вашего javascript кода, ответственного за отправку хитов на вашем сайте. После этого хиты с закешироанных страниц не будут обрабатываться программно, но будут генерировать 404-е ошибки.

  2. В разделе Cloud Tasks можно очистить накопившиеся данные в очереди pull-queue, нажав кнопку PURGE QUEUE.

blog

  1. В логах GAE (Logging > Logs Viewer) отфильтровать данные по условию protoPayload.status:200 и убедиться, что с "нормальным" трафиком все ok.