Опубликовано: 02 янв 2019
Стриминг данных в BigQuery может быть полезен для решения различных аналитических задач. Среди типичных кейсов подобных задач можно перечислить:
Стриминг и сохранение данных в таблице BigQuery осуществляется с помощью двух составляющих. Первая, которую условно назовем "серверная часть", это приложение в Google Cloud Platform , которое может обрабатывать поступающие запросы и сохранять данные в таблице в BigQuery. Приложение может работать на Python, Node.js или любом другом языке поддерживаемом GCP.
Вторая составляющая - это "клиентская часть", javascript сниппет, который отправляет данные через GET или POST запрос (хит) из клиента (браузера) в endpoint нашего приложения в GСP. В интернете имеется ряд бесплатных решений, реализующих в той или иной степени данный процесс. Здесь в качестве основы будет рассмотрен следующий вариант, реализованный на Python: Google Analytics -> BigQuery streaming
Данное решение было протестировано мной на сайтах со средним объемом трафика. Следует отметить, что одно из существенных ограничений данного кода Python, как он есть — это умение обрабатывать только GET запросы. К примеру Google Analytics отправляет данные на свой сервер как через GET, так и POST запросы, поэтому javascript сниппет стриминга должен уметь преобразовывать все запросы отправляемые в BigQuery в GET запросы в случае стриминга данных аналитики.
A. Регистрируемся в Google Cloud Platform. Все сервисы в GCP можно попробовать бесплатно, но к аккаунту нужно будет привязать конкретную платежную карту, в частности это необходимо для использования BigQuery.
B. Создаем в GCP новый проект, в котором будет реализовано приложение (app), которое принимает, обрабатывает и сохраняет данные стриминга в таблицу BigQuery.
C. Переходим в BigQuery и создаем новый Dataset в нашем новом проекте, в котором затем программно будет создана таблица для хранения данных стриминга, оставив опцию Data expiration по умолчанию равной Never.
D. Разворачиваем наше приложение на Python в GCP:
Данная переменная будет нужна для работы Google Cloud SDK Shell.
Если Python уже установлен, при установке SDK Shell убираем галочку напротив Bundled Python.
Просматриваем файл настроек приложения app.yaml и при желании редактируем его, например, можно изменить URL нашего приложения:
runtime: python27
api_version: 1
threadsafe: yes
handlers:
- url: /tasks/process_queue
script: process_queue.app
login: admin
- url: /tasks/create_bq_table
script: create_bq_table.app
login: admin
- url: /t
script: main.app
libraries:
- name: webapp2
version: "2.5.2"
Редактируем файл bqloader.py
, указывая названия нашего проекта, имя dataset в bigquery и имя таблицы, которая будет создана.
При необходимости можно отредактировать схему данных (table_schema). В оригинальном коде, к примеру, нет параметров расширенной электронной торговли GA.
from oauth2client.appengine import AppAssertionCredentials
import httplib2
from apiclient import discovery
import logging
class BQLoader():
project_id = 'yourprojectid'
dataset_id = 'yourdataset'
table_id = 'hits'
table_schema= [
{
"name": "clientId",
"type": "STRING"
},
{
"name": "type",
"type": "STRING"
},
{
"name": "timestamp",
"type": "INTEGER"
},
Если ваши навыки Python позволяют, есть смысл дописать код логирования событий ошибок, как при обработке данных, так и ошибок вставок inserts
BigQuery streaming API.
Другой момент, на который следует обратить внимание - вы должны понимать, как BigQuery старается убирать дубликаты вставок, которые могут возникать при стриминге, а именно, как сформировать и использовать уникальный insertId
для метода insertAll
.
Подробнее об этом можно прочесть в официальных доках:
- Streaming data into BigQuery
- Troubleshooting streaming inserts
- Method: tabledata.insertAll
В случае внесения изменений в схему таблицы данных или URL проекта редактируем файл main.py, правя соответствующую логику:
#!/usr/bin/env python
import webapp2
import json
import string
import base64
from google.appengine.api import taskqueue
from urlparse import urlparse
from urlparse import parse_qs
import logging
import time
import re
class MainHandler(webapp2.RequestHandler):
params_mapping = {
'cid': 'clientId',
'uid': 'userId',
't': 'type',
'tid': 'tid',
'dr': 'fullReferrer',
'sr': 'device.resolution',
'vp': 'device.viewport',
'sd': 'device.colors',
'ul': 'device.language',
'dl': 'page.location',
'dh': 'page.hostname',
'dp': 'page.path',
'dt': 'page.title',
'ec': 'event.category',
'ea': 'event.action',
'el': 'event.label',
'ev': 'event.value',
'ni': 'event.nonint',
'ti': 'transaction.id',
'ta': 'transaction.affiliation',
'tr': 'transaction.revenue',
'tcc': 'transaction.coupon',
'cu': 'transaction.currency',
'pa': 'funnel.action',
'pal': 'funnel.list',
'cos': 'funnel.checkoutStep',
'col': 'funnel.checkoutOption'
}
def process_ga_params(self, params):
data = {}
parsed_url = urlparse(params['dl'])
* * * * * * * *
return data
app = webapp2.WSGIApplication([
('/t', MainHandler)
], debug=True)
cd C:\Users\ys\Desktop\gabq-master
gcloud auth login
Посмотреть авторизованных пользователей можно с помощью команды:
gcloud auth list
При необходимости выбираем нужный аккаунт из списка:
gcloud config set account ACCOUNT
Указываем в конфигурации название нашего проекта:
gcloud config set project yourprojectname
Загружаем проект на GCP:
gcloud app deploy
Регион для приложения можно указать тот же, что и регион выбранный ранее для проекта. Если загрузка прошла успешно, в результате вы увидите сообщение, подтверждающее загрузку файлов:
gcloud app deploy cron.yaml
gcloud app deploy queue.yaml
gcloud app deploy index.yaml
Файл app.yaml
загружается автоматически при разворачивании проекта
Можно закрыть Google Cloud SDK Shell - проект загружен.
Для работы данное app вначале создает новую таблицу в BigQuery. Для этого нужно открыть в браузере следующий URL -
https://[your-app].appspot.com/tasks/create_bq_table
Подтверждением успешно созданной таблицы будет сообщение OK в браузере. и естественно после этого новая таблица будет доступна в Bigquery.
После этого приложение в GCP может принимать данные стриминга от клиентской части, отправляемые с помощью javascript сниппета. Данные буферизируются, поэтому непосредственно в таблице их можно будет увидеть с небольшой задержкой.
В GCP вы обновляете проект в случае необходимости посредством загрузки новой версии вашего приложения. Чтобы GCP не разделяла трафик между старой и новой версией приложения, можно указать соответствующие опции в команде загрузки:
gcloud app deploy --promote --stop-previous-version
Обновлять файлы конфигурации можно без загрузки нововй версии. Например если вам нужно обновить конфигурвцию CRON, достаточно воспользоваться командой:
gcloud app deploy cron.yaml
Нужно помнить о том, что в GAE вы платите по принципу pay as you go, при этом ресурсы, выделяемые вашему приложению, будут масштабировать автоматически при увеличении трафика и объема обрабатываемых данных. Большой всплеск трафика может заметно отразиться на сумме к оплате в вашем биллинг-аккаунте.
Избежать такой непредвиденной траты можно и нужно с помощью установок лимита расходов для вашего приложения в GAE. Установить лимит можно, например, на странице главного дашборда App Engine в блоке Billing status, установив сумму для Daily spending limit.
Понятно, что по достижению лимита приложение перестанет обрабатывать ваш трафик, работа возобновиться только на следующий день. Это пожалуй основной минус такого облачного решения.
Мне однажды пришлось столкнуться с такой ситуацией. Причиной резкого всплеска, было то, что в аналитику и соответственно в BigQuery отправлялись хиты о javascript ошибках на сайте. В какой-то момент один блок неисправного кода начал генерировать ошибку рекурсивно в большом объеме, спамя javascript console и создавая большой множество хитов в GA и BQ.
Картина в логах работы приложения была приблизительно такая:
Несмотря на то, что отслеживание js-ошибок в GTM было отключено в течение суток, за счет того, что страница сайта закешировалась, такой спамовый трафик, как вы видите на графике, продолжался еще несколько дней. И тут приходится выбирать, либо все же позволить себе лишние траты, но сохранить все поступающие данные, либо оперативно предпринять меры по снижению роста расходов, что к сожалению скорее всего приведет к остановкам работы приложения по достижению соответствующих лимитив квот.
Если вы выбираете второе, вот что можно сделать:
Если после устранения ошибок на сайте, вы по прежнему видите большой трафик спама с закешированных страниц, можно изменить точку логирования данных. Это нужно сделать в двух местах непосредственно в самом приложении — в файле app.yaml
и в файле main.py
(и загрузить новую версию), а также везде, где нужно в коде вашего javascript кода, ответственного за отправку хитов на вашем сайте.
После этого хиты с закешироанных страниц не будут обрабатываться программно, но будут генерировать 404-е ошибки.
В разделе Cloud Tasks можно очистить накопившиеся данные в очереди pull-queue, нажав кнопку PURGE QUEUE.
protoPayload.status:200
и убедиться, что с "нормальным" трафиком все ok.