Опубликовано: 07 июл 2019
Предположим, что вам нужно регулярно, например на ежедневной основе, загружать какие-то данные в BigQuery, и вы хотите сделать этот процесс автоматическим. Один из способов решения данной задачи — это автоматический экспорт данных json (новая строка разделитель) при загрузке соответствующего файла в Google Cloud Storage в какую-нибудь определенную папку вашего проекта с помощью Cloud Functions.
Хороший юскейс для подобной задачи - это построение Сквозной аналитики на базе BigQuery
Особенность Cloud Functions заключается в том, что это в некотором смысле квинтэссенция serverless-решений, где для выполнения определенных полезных действий по выбранному вами триггеру не нужен ни север, ни API, ни процесс авторизации, если все происходит в рамках одного проекта, ни секретные ключи, т. е. в общем-то ничего, кроме нескольких строчек кода самой функции, и пары строчек кода в файле конфигурации зависимостей package.json
.
В данном примере, мы напишем функцию на javascript
для окружения Node.js
, но последовательность действий будет общая и для других языков и окружений.
Если ваш проект, к примеру, называется jsonuploading
, то это можно будет сделать по ссылке console.cloud.google.com/apis/dashboard?project=jsonuploading
Затем соответственно вам нужно в Google Cloud Storage для этого же проекта создать bucket
, т. е. папку, в которую вы будете загружать json файлы, и которая будет привязана в вашей функции. Сделать это можно по ссылке console.cloud.google.com/storage/create-bucket?project=jsonuploading
При конфигурировании новой папки для параметра storage class
можно выбрать Multi-Regional
, а для Access control model
соответственно Set permissions uniformly at bucket-level
.
В BigQuery в выбранном проекте создаем, если еще не сделали этого, новый Dataset
в который и будут добавляться таблицы с данными из json-файлов.
Чтобы можно было строить запросы по всем добавляемым таблицам, мы будем все файлы именовать одинаково, меняя в названии только цифры даты за которую, к примеру, создан файл. Т.е. файлы могут называться так: orders_20190701.json
, orders_20190702.json
, orders_20190703.json
и т. п. Таблицы, создаваемые в BigQuery для этих файлов будут копировать название из загружаемого файла.
Заходим в интерфейс Cloud Functions в нашем проекте по ссылке console.cloud.google.com/functions/list?project=jsonuploading
и нажимаем кнопку Create Function.
В открывшемся интерфейсе мы можем создавать и редактировать наши функции. Задаем имя функции, а в качестве триггера и события выбираем соответственно Cloud Storage и Finalize/Create, поскольку мы хотим вызывать функцию по факту загрузки файла в Cloud Storage.
Во вкладке внутреннего редакторы index.js мы будем писать код нашей функции, а во вкладке pakage.json мы укажем зависимости. Наша исполняемая функция будет называться bqload, что также нужно указать в соответствующем поле Function to execute, расположенном ниже.
Первый блок кода нашей функции — это импорт необходимых модулей:
const assert = require('assert');
const path = require('path');
const {BigQuery} = require('@google-cloud/bigquery');
const {Storage} = require('@google-cloud/storage');
const bigquery = new BigQuery();
const storage = new Storage({
projectId: 'jsonsuploading',
});
Далее мы задаем схему создаваемых таблиц в BigQuery, которая естественно должна соответствовать данным в json для загрузки.
В качестве тестовых данных мы будем использовать такой файл:
{"orderID":"76755755","custID":"394834","custType":"WIP","location":"Moscow","date":"2018-12-16","purchasedItems":[{"sku":"product1","description":"Суаер товар номер один","quantity":2,"price":290},{"sku":"product2","description":"Пример другого товара","quantity":3,"price":230}]}
{"orderID":"76755756","custID":"195678","custType":"Ordinary","location":"SanktPeterburg","date":"2018-12-16","purchasedItems":[{"sku":"product3","description":"Третий товар","quantity":10,"price":9},{"sku":"product7","description":"Некий товар под номером семь","quantity":3,"price":130}]}
Скачать файл
Две строчки для примера будет достаточно, но естественно файл может быть практическе насколько угодно большим. Об ограничениях можно почитать здесь - Loading JSON data from Cloud Storage
Схема, которую мы зададим в нашем коде для такой структуры json-данных будет следующей:
const tableschema = [
{
"name": "date",
"type": "DATE",
"mode": "NULLABLE"
},
{
"name": "location",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "purchasedItems",
"type": "RECORD",
"mode": "REPEATED",
"fields": [
{
"name": "price",
"type": "INTEGER",
"mode": "NULLABLE"
},
{
"name": "quantity",
"type": "INTEGER",
"mode": "NULLABLE"
},
{
"name": "description",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "sku",
"type": "STRING",
"mode": "NULLABLE"
}
]
},
{
"name": "custType",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "custID",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "orderID",
"type": "STRING",
"mode": "NULLABLE"
}
];
Теперь сама исполняемая функция. Смотрите комментарии в коде.
exports.bqload = (event) => {
const file = event;
console.log(`File ${file.name} start procesing.`);
const datasetId = 'uploading';
const tableId = path.parse(file.name).name;
const ext = path.parse(file.name).ext
const jobMetadata = {
schema: {
fields: tableschema
},
sourceFormat : 'NEWLINE_DELIMITED_JSON', // указываем какой формат файла для импорта
createDisposition : 'CREATE_IF_NEEDED', // если таблицы с таким названием нет, создаем новую
writeDisposition: 'WRITE_TRUNCATE' // если таблица существует, перезаписываем данные
};
if (ext == '.json') {
return bigquery
.dataset(datasetId)
.table(tableId)
.load(storage.bucket(file.bucket).file(file.name), jobMetadata)
.then(results => {
const job = results[0];
// load() waits for the job to finish
assert.equal(job.status.state, 'DONE');
console.log(`File ${file.name} processed. Job ${job.id} completed.`);
// Check the job's status for errors
const errors = job.status.errors;
if (errors && errors.length > 0) {
throw errors;
}
})
.catch(err => {
console.error('ERROR:', err);
});
} else {
return console.error(`File ${file.name} uploaded has ${ext} extension`);
}
};
Скачать весь код функции
Теперь переходим во вкладку редактора pakage.json и там прописываем зависимости:
{
"dependencies": {
"@google-cloud/bigquery": "^2.0.3",
"@google-cloud/storage": "^2.3.3"
}
}
На этом функция готова, нажимаем кнопку Deploy для загрузки и активации функции.
Переходим в Cloud Storage и загружаем в наш bucket файл для примера. Если все ok, через минуту вы увидите новую таблицу в выбранном data set в BigQuery.
Пример запроса к сабсету таблиц:
SELECT * FROM `jsonstreaming.uploading.orders_2019*` WHERE _TABLE_SUFFIX BETWEEN '0701' AND '0703'
Наша функция логирует несколько сообщений о об этапах ее работы, которые можно посмотреть в разделе Stackdriver Logging → Logs Viewer по ссылке console.cloud.google.com/logs/viewer
.
Там же можно увидеть и ошибки, если таковые будут.
Здесь описан самый базовый алгоритм развертывания функции непосредственно в веб интерфейсе Cloud Fanctions. Это сделано для наглядности последовательности действий. С помощью же дополнительных инструментов вы можете писать код функции и даже в некоторой степени тестировать ее на своем компьютере в удобном для вас редакторе. Информация по данным инструментам доступна в соответствующих сообществах.