kashinoki38 blog

something like tech blog

RDSのスナップショットエクスポートをSpectrumを経由することでRedshiftで暗黙型変換させる(Decimal→BIGINT)

RDS のスナップショットを S3 にエクスポートすることができるが、そのフォーマットは Parquet になる。

Parquet にはスキーマがあり、Redshift 側のデータ型を適切に定義していないと、COPY でロードした場合にエラーが出る。
その辺の話。

Parquet スキーマと Redshift のデータ型不整合で出るエラー

例えば、Parquet スキーマfixed_len_byte_arrayDecimal(20,0) アノテーションがついているカラムを Redshift の BIGINT にロードしようとすると次のようなエラーがでる。

Spectrum Scan Error. File 'https://s3.ap-northeast-1.amazonaws.com/xxxxx/part-xxxxxx.gz.parquet' has an incompatible Parquet schema for column 's3://xxxxx/part-xxxxxx.gz.parquet.id'. Column type: BIGINT, Parquet schema: optional fixed_len_byte_array id [i:0 d:1 r:0] (s3://xxxxx/part-xxxxxx

▼Redshift テーブル定義

CREATE TABLE "xxxxx"(
  "id" BIGINT ENCODE LZO NULL,
  ...
);

▼Parquet スキーマ

>>> import pyarrow.parquet as pq
>>> parquet_file = pq.ParquetFile('part-xxxxx.gz.parquet')
>>> print(parquet_file.schema)
<pyarrow._parquet.ParquetSchema object at 0x7f68581b1040>
required group field_id=0 spark_schema {
  optional fixed_len_byte_array(9) field_id=1 id (Decimal(precision=20, scale=0));
  ...
}

解決策①:Redshift のテーブル定義を Parquet スキーマに合わせる

単純に、COPY先の Redshift のテーブル定義を Parquet スキーマに合わせる方法。
この場合、BIGINT ではなく NUMERIC として Redshift のカラムを定義することでCOPYできる。

CREATE TABLE "xxxxx"(
  "id" NUMERIC(20,0) ENCODE LZO NULL,
  ...
);

Parquet スキーマを都度確認、あるいは以下のドキュメントベースで RDS のカラムから返還される Parquet のスキーマを確認し、それに合わせて一時テーブルを Redshift 側で用意し、INSERT INTO SELECT で本テーブルにキャストする。

※RDS MySQLの型から、Parquetのどの型にマッピングされるかは以下にて確認可能
https://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/UserGuide/USER_ExportSnapshot.html#USER_ExportSnapshot.data-types:~:text=%E3%81%B8%E3%81%AE%E3%83%9E%E3%83%83%E3%83%94%E3%83%B3%E3%82%B0-,MySQL%20%E3%81%8A%E3%82%88%E3%81%B3MariaDB%20%E3%83%87%E3%83%BC%E3%82%BF%E5%9E%8B%E3%81%AE%20Parquet%20%E3%81%B8%E3%81%AE%E3%83%9E%E3%83%83%E3%83%94%E3%83%B3%E3%82%B0,-%E6%AC%A1%E3%81%AE%E8%A1%A8

BIGINT UNSIGNED だと FIXED_LEN_BYTE_ARRAY(9)DECIMAL(20,0) アノテーションとなることがわかる。

解決案②:Redshift Spectrum参照

都度スキーマを確認して、対応する型の一時テーブルを用意するのは面倒なので、 Spectrum を経由することで使って Redshift の暗黙の型変換(DECIMALBIGINT)が使えるようにする方法。

DECIMAL→BIGINTは暗黙的に型変換される。

手順

Redshift にて外部スキーマの作成

CREATE EXTERNAL SCHEMA <external_table_name> FROM DATA CATALOG DATABASE <external_db_name> IAM_ROLE XXXX CREATE EXTERNAL DATABASE IF NOT EXISTS;

同時に Glue のデータカタログのDBも作成される

※以下リンクの外部スキーマの作成移行の手順が参考になる。
https://dev.classmethod.jp/articles/redshift_2_parquet_spectrum_easy/#anc6

Glue クローラーにて S3 上の Parquet のディレクトリを指定

出力のDBを上記の <external_db_name> にする

※この際単ファイルを対象にするとクエリ時にエラーが出るため、ディレクトリパスを指定する https://stackoverflow.com/questions/62278504/xx000500310-amazon500310-invalid-operation-parsed-manifest-is-not-a-val

Parsed manifest is not a valid JSON object.

※Glue クローラーを使わずに Redshift にて CREATE EXTERNAL TABLE 文で外部表を作成することも可能だが、その場合 Parquet の型に合わせて外部表のテーブル定義が必要なので①と似たようなことになるため、クローラーのほうが型を意識しなくて済むので楽そう

Glue クローラーのオンデマンド実行

Parquet ファイルのテーブルカタログ(カラムごとの型情報)が Glue カタログのDB(<external_db_name>)内に作成される。

この時点では対象列は Decimal(20,0)

spectrmdbを外部DBにして外部スキーマを定義すると、そのスキーマ経由でGlueのDB内のテーブルにアクセスできる
※Glueのテーブル名はS3のパス名になる(クローラーを使うと)

Redshift spectrum にて上記 Glue カタログのテーブルはクエリ可能

SELECT * FROM <external_table_name>.<table_name>

一番最初に作成した外部スキーマから参照可能

本テーブルにINSERT INTO SELECT

INSERT INTO redshift_target_table SELECT * FROM <external_table_name>.<table_name>;

この際に Redshift の暗黙型変換が利用可能。

利用できる Redshift クエリ

外部スキーマ作成

create external schema <external_schema_name>
from data catalog 
database 'spectrumdb' 
iam_role 'arn:aws:iam::xxxxxxx:role/xxxxxx'
create external database if not exists;

spectrmdbを外部DBにして外部スキーマを定義すると、そのスキーマ経由でGlueのDB内のテーブルにアクセスできる

外部テーブル作成(この場合型定義が必要、クローラーのほうが楽)

create external table <external_table_name>(
  "id"  BIGINT,
  ...)
STORED AS parquet
  location 's3://xxxxxxx/';

外部表のクエリ

select * from <external_schema_name>.<external_table_name>

外部表からRedshift表へのINSERT INTO SELECT

insert into <target_table> (select * from <external_schema_name>.<external_table_name>)

外部スキーマの定義確認

select esoid, nspname as schemaname, nspowner, esdbname as external_db, esoptions 
from pg_namespace a,pg_external_schema b where a.oid=b.esoid;

スキーマ内のテーブルの確認

select distinct(tablename) from pg_table_def
where schemaname = '<external_schema_name>';