forked from googleapis/python-bigquery
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_list_rows.py
More file actions
154 lines (129 loc) · 5.45 KB
/
test_list_rows.py
File metadata and controls
154 lines (129 loc) · 5.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import decimal
from dateutil import relativedelta
from google.cloud import bigquery
from google.cloud.bigquery import enums
def test_list_rows_empty_table(bigquery_client: bigquery.Client, table_id: str):
from google.cloud.bigquery.table import RowIterator
table = bigquery_client.create_table(table_id)
# It's a bit silly to list rows for an empty table, but this does
# happen as the result of a DDL query from an IPython magic command.
rows = bigquery_client.list_rows(table)
assert isinstance(rows, RowIterator)
assert tuple(rows) == ()
def test_list_rows_page_size(bigquery_client: bigquery.Client, table_id: str):
num_items = 7
page_size = 3
num_pages, num_last_page = divmod(num_items, page_size)
to_insert = [{"string_col": "item%d" % i, "rowindex": i} for i in range(num_items)]
bigquery_client.load_table_from_json(to_insert, table_id).result()
df = bigquery_client.list_rows(
table_id,
selected_fields=[bigquery.SchemaField("string_col", enums.SqlTypeNames.STRING)],
page_size=page_size,
)
pages = df.pages
for i in range(num_pages):
page = next(pages)
assert page.num_items == page_size
page = next(pages)
assert page.num_items == num_last_page
def test_list_rows_scalars(bigquery_client: bigquery.Client, scalars_table: str):
rows = sorted(
bigquery_client.list_rows(scalars_table), key=lambda row: row["rowindex"]
)
row = rows[0]
assert row["bool_col"] # True
assert row["bytes_col"] == b"Hello, World!"
assert row["date_col"] == datetime.date(2021, 7, 21)
assert row["datetime_col"] == datetime.datetime(2021, 7, 21, 11, 39, 45)
assert row["geography_col"] == "POINT(-122.0838511 37.3860517)"
assert row["int64_col"] == 123456789
assert row["interval_col"] == relativedelta.relativedelta(
years=7, months=11, days=9, hours=4, minutes=15, seconds=37, microseconds=123456
)
assert row["numeric_col"] == decimal.Decimal("1.23456789")
assert row["bignumeric_col"] == decimal.Decimal("10.111213141516171819")
assert row["float64_col"] == 1.25
assert row["string_col"] == "Hello, World!"
assert row["time_col"] == datetime.time(11, 41, 43, 76160)
assert row["timestamp_col"] == datetime.datetime(
2021, 7, 21, 17, 43, 43, 945289, tzinfo=datetime.timezone.utc
)
nullrow = rows[1]
for column, value in nullrow.items():
if column == "rowindex":
assert value == 1
else:
assert value is None
def test_list_rows_scalars_extreme(
bigquery_client: bigquery.Client, scalars_extreme_table: str
):
rows = sorted(
bigquery_client.list_rows(scalars_extreme_table),
key=lambda row: row["rowindex"],
)
row = rows[0]
assert row["bool_col"] # True
assert row["bytes_col"] == b"\r\n"
assert row["date_col"] == datetime.date(9999, 12, 31)
assert row["datetime_col"] == datetime.datetime(9999, 12, 31, 23, 59, 59, 999999)
assert row["geography_col"] == "POINT(-135 90)"
assert row["int64_col"] == 9223372036854775807
assert row["interval_col"] == relativedelta.relativedelta(
years=-10000, days=-3660000, hours=-87840000
)
assert row["numeric_col"] == decimal.Decimal(f"9.{'9' * 37}E+28")
assert row["bignumeric_col"] == decimal.Decimal(f"9.{'9' * 75}E+37")
assert row["float64_col"] == float("Inf")
assert row["string_col"] == "Hello, World"
assert row["time_col"] == datetime.time(23, 59, 59, 999999)
assert row["timestamp_col"] == datetime.datetime(
9999, 12, 31, 23, 59, 59, 999999, tzinfo=datetime.timezone.utc
)
nullrow = rows[4]
for column, value in nullrow.items():
if column == "rowindex":
assert value == 4
else:
assert value is None
def test_list_rows_range(bigquery_client: bigquery.Client, scalars_table_csv: str):
rows = bigquery_client.list_rows(scalars_table_csv)
rows = list(rows)
row = rows[0]
expected_range = {
"start": datetime.date(2020, 1, 1),
"end": datetime.date(2020, 2, 1),
}
assert row["range_date"] == expected_range
row_null = rows[1]
assert row_null["range_date"] is None
def test_list_rows_pico(bigquery_client: bigquery.Client, scalars_table_pico: str):
rows = bigquery_client.list_rows(
scalars_table_pico, timestamp_precision=enums.TimestampPrecision.PICOSECOND
)
rows = list(rows)
row = rows[0]
assert row["pico_col"] == "2025-01-01T00:00:00.123456789012Z"
def test_list_rows_pico_truncate(
bigquery_client: bigquery.Client, scalars_table_pico: str
):
# For a picosecond timestamp column, if the user does not explicitly set
# timestamp_precision, will return truncated microsecond precision.
rows = bigquery_client.list_rows(scalars_table_pico)
rows = list(rows)
row = rows[0]
assert row["pico_col"] == "1735689600123456"