# Copyright (c) 2023 Baidu, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
wrapper for pyarrow.Table
"""
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
import pyarrow
import pyarrow.compute as pc
from pyarrow import Table as PyarrowTable
from typing_extensions import Self
from qianfan.dataset.consts import (
QianfanDataGroupColumnName,
QianfanDatasetPackColumnName,
)
from qianfan.dataset.process_interface import (
Addable,
Listable,
Processable,
)
from qianfan.dataset.table_utils import _construct_table_from_nest_sequence
from qianfan.utils import log_debug, log_error, log_info, log_warn
from qianfan.utils.pydantic import BaseModel
def _create_new_table_for_add(
elem: Union[
List[List[Dict]], List[Dict], Tuple[Dict], Dict, List[str], Tuple[str], str
],
is_dataset_packed: bool = False,
add_new_group: bool = False,
is_grouped: bool = True,
group_id: int = -1,
**kwargs: Any,
) -> PyarrowTable:
if isinstance(elem, (list, tuple)):
log_info("add a sequence object to table")
if not elem:
err_msg = "element is empty"
log_error(err_msg)
raise ValueError(err_msg)
log_debug(f"append row data: {elem}")
if is_dataset_packed:
log_info("enter packed appending logic")
if isinstance(elem[0], dict):
return pyarrow.Table.from_pydict({QianfanDatasetPackColumnName: [elem]})
elif isinstance(elem[0], list) and isinstance(elem[0][0], dict):
return pyarrow.Table.from_pydict({QianfanDatasetPackColumnName: elem})
elif isinstance(elem[0], str):
return pyarrow.Table.from_pydict({QianfanDatasetPackColumnName: elem})
else:
err_msg = f"element cannot be instance of {type(elem)}"
log_error(err_msg)
raise ValueError(err_msg)
if not isinstance(elem[0], dict):
err_msg = (
"element in sequence-like "
"container cannot be instance of"
f" {type(elem[0])}"
)
log_error(err_msg)
raise ValueError(err_msg)
# TODO 是否需要做深拷贝?
tables: List = list(elem)
if group_id != -1:
log_info("enter grouped appending logic")
if not add_new_group:
for table in tables:
table[QianfanDataGroupColumnName] = group_id
elif is_grouped:
for table in tables:
table[QianfanDataGroupColumnName] = group_id + 1
else:
for i in range(len(tables)):
table = tables[i]
table[QianfanDataGroupColumnName] = group_id + i + 1
log_debug(f"row data after processing: {table}")
return pyarrow.Table.from_pylist(tables)
elif isinstance(elem, dict):
log_info("add a dict object to table")
if is_dataset_packed:
elem = {QianfanDatasetPackColumnName: [elem]}
elif group_id != -1:
elem[QianfanDataGroupColumnName] = group_id + (1 if add_new_group else 0)
log_debug(f"row data after processing: {elem}")
return pyarrow.Table.from_pylist([elem])
elif isinstance(elem, str):
if not is_dataset_packed:
err_msg = "can't add string when your table isn't packed"
log_error(err_msg)
raise ValueError(err_msg)
return pyarrow.Table.from_pylist([{QianfanDatasetPackColumnName: elem}])
else:
err_msg = f"element cannot be instance of {type(elem)}"
log_error(err_msg)
raise ValueError(err_msg)
def _whether_dataset_is_packed(col_names: List[str]) -> bool:
return col_names == [QianfanDatasetPackColumnName]
def _whether_dataset_is_grouped(col_names: List[str]) -> bool:
return QianfanDataGroupColumnName in col_names
class _PyarrowRowManipulator(BaseModel, Addable, Listable, Processable):
"""handler for processing of pyarrow table row"""
class Config:
arbitrary_types_allowed = True
table: PyarrowTable
def _inner_table_is_packed(self) -> bool:
return _whether_dataset_is_packed(self.table.column_names)
def _inner_table_is_grouped(self) -> bool:
return _whether_dataset_is_grouped(self.table.column_names)
def append(
self,
elem: Union[List[Dict], Tuple[Dict], Dict],
is_dataset_packed: bool = False,
add_new_group: bool = False,
is_grouped: bool = True,
group_id: int = -1,
**kwargs: Any,
) -> Self:
"""
append element(s) to pyarrow table
Args:
elem (Union[List[Dict], Tuple[Dict], Dict]):
element(s) added to pyarrow table
is_dataset_packed (bool): whether table is packed, default to False.
add_new_group (bool): whether elem has new group id, default to False.
is_grouped (bool): whether elem is grouped, default to True.
group_id (int): new group id, default to -1.
**kwargs (Any): optional arguments
Returns:
Self: a new pyarrow table
"""
return pyarrow.concat_tables(
[
self.table,
_create_new_table_for_add(
elem,
is_dataset_packed,
add_new_group,
is_grouped,
group_id,
**kwargs,
),
],
promote=True,
)
def insert(
self,
elem: Union[List[Dict], Tuple[Dict], Dict],
index: int,
is_dataset_packed: bool = False,
add_new_group: bool = False,
is_grouped: bool = True,
group_id: int = -1,
**kwargs: Any,
) -> Self:
"""
insert element(s) to pyarrow table
Args:
elem (Union[List[Dict], Tuple[Dict], Dict]):
element(s) added to pyarrow table
index (int): where to insert element(s).
is_dataset_packed (bool): whether table is packed, default to False.
add_new_group (bool): whether elem has new group id, default to False.
is_grouped (bool): whether elem is grouped, default to True.
group_id (int): new group id, default to -1.
**kwargs (Any): optional arguments
Returns:
Self: a new pyarrow table
"""
table_length = self.table.num_rows
if index < 0 or index > table_length:
err_msg = f"can't insert element at {index}"
log_error(err_msg)
raise ValueError(err_msg)
if index == table_length:
return self.append(
elem, is_dataset_packed, add_new_group, is_grouped, group_id, **kwargs
)
new_table = _create_new_table_for_add(
elem, is_dataset_packed, add_new_group, is_grouped, group_id, **kwargs
)
if index == 0:
return pyarrow.concat_tables([new_table, self.table], promote=True)
table_front_part = self.table.slice(length=index)
table_rear_part = self.table.slice(index)
return pyarrow.concat_tables(
[table_front_part, new_table, table_rear_part], promote=True
)
def list(
self, by: Optional[Union[slice, int, str, Sequence[int], Sequence[str]]] = None
) -> Any:
"""
get element(s) from pyarrow table
Args:
by (Optional[Union[slice, int, Sequence[int]]]):
index or indices for elements, default to None, in which case
return a python list of pyarrow table row
Returns:
Any: pyarrow table row list
"""
if isinstance(by, str) or (
isinstance(by, (list, tuple)) and isinstance(by[0], str)
):
raise ValueError("cannot get row from table by str")
if self._inner_table_is_packed():
if by is None:
return self.table.to_pydict()[QianfanDatasetPackColumnName]
if isinstance(by, int):
return self.table.take([by]).to_pylist()[0][
QianfanDatasetPackColumnName
]
elif isinstance(by, (list, tuple)):
return self.table.take(list(by)).to_pydict()[
QianfanDatasetPackColumnName
]
elif isinstance(by, slice):
return self.table.slice(
offset=by.start, length=by.stop - by.start + 1
).to_pydict()[QianfanDatasetPackColumnName]
else:
raise ValueError(
f"unsupported key type {type(by)} when get row from table"
)
if by is None:
return self.table.to_pylist()
if isinstance(by, int):
return self.table.take([by]).to_pylist()[0]
elif isinstance(by, (list, tuple)):
return self.table.take(list(by)).to_pylist()
elif isinstance(by, slice):
return self.table.slice(
offset=by.start, length=by.stop - by.start + 1
).to_pylist()
else:
raise ValueError(f"unsupported key type {type(by)} when get row from table")
def map(self, op: Callable[[Any], Any]) -> Self:
"""
map on pyarrow table's row
Args:
op (Callable[[Any], Any]): handler used to map
Returns:
Self: a new pyarrow table
"""
# 构建出的新 table 会按照首行的 key 作为 columns
if self._inner_table_is_packed():
new_list: List[Union[List[Dict[str, Any]], str]] = []
for row in self.table.column(QianfanDatasetPackColumnName).to_pylist():
returned_data = op(row)
if not returned_data:
log_warn("a row has been deleted from table")
continue
if not isinstance(returned_data, (list, str)):
raise ValueError(
"returned value isn't list or str, rather"
f" {type(returned_data)}"
)
new_list.append(returned_data)
return pyarrow.Table.from_pydict({QianfanDatasetPackColumnName: new_list})
else:
new_table: List[Dict[str, Any]] = []
is_grouped = self._inner_table_is_grouped()
for row_index in range(self.table.num_rows):
origin_data = self.table.take([row_index]).to_pylist()[0]
input_dict = {key: val for key, val in origin_data.items()}
group_number = (
None if not is_grouped else input_dict[QianfanDataGroupColumnName]
)
returned_data = op(input_dict)
if not returned_data:
log_warn("a row has been deleted from table")
continue
if not isinstance(returned_data, dict):
raise ValueError("returned value isn't dict")
if is_grouped and QianfanDataGroupColumnName not in returned_data:
returned_data[QianfanDataGroupColumnName] = group_number
new_table.append(returned_data)
return pyarrow.Table.from_pylist(new_table)
def filter(self, op: Callable[[Any], bool]) -> Self:
"""
filter on pyarrow table's row
Args:
op (Callable[[Any], bool]): handler used to filter
Returns:
Self: a new pyarrow table
"""
selection_masks: List[bool] = []
if self._inner_table_is_packed():
for row in self.table.column(QianfanDatasetPackColumnName).to_pylist():
flag = op(row)
if flag is None:
raise ValueError("cant return None")
if not isinstance(flag, bool):
raise ValueError("returned value isn't bool")
selection_masks.append(flag)
else:
for row_index in range(self.table.num_rows):
origin_data = self.table.take([row_index]).to_pylist()[0]
input_dict = {key: val for key, val in origin_data.items()}
flag = op(input_dict)
if flag is None:
raise ValueError("cant return None")
if not isinstance(flag, bool):
raise ValueError("returned value isn't bool")
selection_masks.append(flag)
return self.table.filter(mask=selection_masks)
def delete(self, index: Union[int, str]) -> Self:
"""
delete an element from pyarrow table
Args:
index (Union[int, str]): element index to delete
Returns:
Self: a new pyarrow table
"""
if isinstance(index, str):
raise ValueError("cannot delete row by str")
table_length = self.table.num_rows
if index < 0 or index >= table_length:
raise OverflowError(f"index overflow, table length is {table_length}")
if index == 0:
return self.table.slice(1)
elif index == table_length - 1:
return self.table.slice(0, table_length - 1)
return pyarrow.concat_tables(
[self.table.slice(0, index), self.table.slice(index + 1)]
)
class _PyarrowColumnManipulator(BaseModel, Addable, Listable, Processable):
"""handler for processing of pyarrow table column"""
class Config:
arbitrary_types_allowed = True
table: PyarrowTable
def append(self, elem: Dict[str, List]) -> Self:
"""
append a row to pyarrow table
Args:
elem (Dict[str, List]): dict containing element added to pyarrow table
key as column name, value as column data
Returns:
Self: a new pyarrow table
"""
if not isinstance(elem, dict):
raise ValueError(f"element appended must be dict, not {type(elem)}")
for name, data in elem.items():
if name in self.table.column_names:
raise ValueError(f"column name {name} has been in dataset column list")
if not isinstance(data, list):
raise TypeError(f"data isn't list, rather than {type(data)}")
if len(data) != self.table.num_rows:
raise ValueError(
f"the length of data need to be {self.table.num_rows}, rather than"
f" {len(data)}"
)
self.table = self.table.append_column(name, [data])
return self.table
def insert(self, elem: Dict[str, List], index: int) -> Self:
"""
insert a row to pyarrow table
Args:
elem (Dict[str, List]): dict containing element added to pyarrow table
must has column name "name" and column data list "data"
index (int): where to insert new column
Returns:
Self: a new pyarrow table
"""
col_length = self.table.num_columns
if index < 0 or index > col_length:
err_msg = f"can't insert column at {index}"
log_error(err_msg)
raise ValueError(err_msg)
if index == col_length:
return self.append(elem)
return self.table.add_column(index, elem["name"], [elem["data"]])
def list(
self, by: Optional[Union[slice, int, str, Sequence[int], Sequence[str]]] = None
) -> Any:
"""
get column(s) from pyarrow table
Args:
by (Optional[Union[int, str, Sequence[int], Sequence[str]]]):
index or indices for columns, default to None, in which case
return a python list of pyarrow table column
Returns:
Any: pyarrow table column list
"""
if by is None:
return self.table.to_pydict()
if isinstance(by, slice):
raise ValueError("cannot get column by slice")
if isinstance(by, (int, str)):
indices: Any = [by]
else:
indices = by
if isinstance(indices[0], str) and not set(indices).issubset(
set(self.table.column_names)
):
raise ValueError(f"contain not existed column name: {indices}")
return self.table.select(list(indices)).to_pydict()
def map(self, op: Callable[[Any], Any]) -> Self:
"""
map on pyarrow table's column
Args:
op (Callable[[Any], Any]): handler used to map
Returns:
Self: a new pyarrow table
"""
new_columns: Dict[str, List[Any]] = {}
for i in range(self.table.num_columns):
column = self.table.select([i]).to_pydict()
ret_column = op(column)
new_columns.update(ret_column)
return pyarrow.Table.from_pydict(new_columns)
def filter(self, op: Callable[[Any], bool]) -> Self:
"""
filter on pyarrow table's column
Args:
op (Callable[[Any], bool]): handler used to filter
Returns:
Self: a new pyarrow table
"""
dropped_column_name = []
for i in range(self.table.num_columns):
column = self.table.select([i]).to_pydict()
if not op(column):
dropped_column_name += list(column.keys())
return self.table.drop_columns(dropped_column_name)
def delete(self, index: Union[int, str]) -> Self:
"""
delete an column from pyarrow table
Args:
index (str): column name to delete
Returns:
Self: a new pyarrow table
"""
if isinstance(index, int):
raise ValueError("cannot delete column by int")
return self.table.drop_columns(index)
def col_renames(self, new_names: List[str]) -> Self:
"""
rename all dataset column
Args:
new_names (List[str]): All new names for columns
Returns:
Self: a new pyarrow table
"""
if (
_whether_dataset_is_grouped(self.table.column_names)
and QianfanDataGroupColumnName not in new_names
):
i = self.table.column_names.index(QianfanDataGroupColumnName)
new_names.insert(i, QianfanDataGroupColumnName)
return self.table.rename_columns(new_names)
[docs]class Table(Addable, Listable, Processable):
"""
dataset representation on memory
inherited from pyarrow.Table,implementing interface in process_interface.py
"""
def __init__(self, inner_table: PyarrowTable) -> None:
"""
Init a Table object
Args:
inner_table (PyarrowTable): a pyarrow.Table object wrapped by Table
"""
# 内部使用的 pyarrow.Table 对象
self.inner_table: PyarrowTable = inner_table
def _row_op(self) -> _PyarrowRowManipulator:
return _PyarrowRowManipulator(table=self.inner_table)
def _col_op(self) -> _PyarrowColumnManipulator:
return _PyarrowColumnManipulator(table=self.inner_table)
[docs] def is_dataset_packed(self) -> bool:
return _whether_dataset_is_packed(self.inner_table.column_names)
[docs] def is_dataset_grouped(self) -> bool:
return _whether_dataset_is_grouped(self.inner_table.column_names)
def _squash_group_number(self) -> None:
if not self.is_dataset_grouped():
log_warn("squash group number when table isn't grouped")
return
self.inner_table = self.inner_table.sort_by(QianfanDataGroupColumnName)
group_column_list = self.col_list(QianfanDataGroupColumnName)[
QianfanDataGroupColumnName
]
last_appeared_number = group_column_list[0]
current_group_number = 0
new_group_column_list = [0]
for i in range(1, len(group_column_list)):
num = group_column_list[i]
if num != last_appeared_number:
last_appeared_number = num
current_group_number += 1
new_group_column_list.append(current_group_number)
self.col_delete(QianfanDataGroupColumnName)
self.col_append({QianfanDataGroupColumnName: new_group_column_list})
return
[docs] def pack(self) -> bool:
"""
pack all group into 1 row
and make table array-like with single column
Returns:
bool: whether packing succeeded
"""
if QianfanDataGroupColumnName not in self.col_names():
log_error("can't pack a dataset without '_group' column")
return False
if len(self.col_names()) == 1:
log_error("can't pack a dataset only with '_group' column")
return False
if self.inner_table.column(QianfanDataGroupColumnName).null_count:
log_error("can't pack a dataset when column '_group' has None")
return False
self._squash_group_number()
inner_index = "_index"
group_ordered_table: pyarrow.Table = self.inner_table.append_column(
inner_index, [list(range(self.row_number()))]
).sort_by(QianfanDataGroupColumnName)
result_list: List[List[Dict[str, Any]]] = []
for row in group_ordered_table.to_pylist():
group_index = row[QianfanDataGroupColumnName]
if group_index < 0:
log_error(
f"row {row[inner_index]} has illegal group value:"
f" {row[QianfanDataGroupColumnName]}"
)
return False
row.pop(inner_index)
row.pop(QianfanDataGroupColumnName)
while group_index >= len(result_list):
result_list.append([])
result_list[group_index].append(row)
self.inner_table = pyarrow.Table.from_pydict(
{QianfanDatasetPackColumnName: result_list}
)
return True
[docs] def unpack(self) -> bool:
"""
unpack all element in the row "_pack"
make sure the element in the column "_pack"
is Sequence[Dict[str, Any]]
Returns:
bool: whether unpacking succeeded
"""
if QianfanDatasetPackColumnName not in self.col_names():
log_warn("can't pack a dataset without '_pack' column")
return False
if len(self.col_names()) != 1:
log_warn("dataset should only contain '_pack' column")
return False
if self.inner_table.column(QianfanDatasetPackColumnName).null_count:
log_warn("can't unpack a dataset when column '_pack' has None")
return False
element = self.list(0)
if not (
isinstance(element, (list, tuple))
and element
and isinstance(element[0], dict)
):
log_warn(f"dataset has element not supported: {element}")
return False
data_list = self.to_pydict()[QianfanDatasetPackColumnName]
self.inner_table = _construct_table_from_nest_sequence(data_list)
return True
# 直接调用 Table 对象的接口方法都默认是在行上做处理
[docs] def map(self, op: Callable[[Any], Any]) -> Self:
"""
map on pyarrow table's row
Args:
op (Callable[[Any], Any]): handler used to map
Returns:
Self: Table itself
"""
manipulator = self._row_op()
self.inner_table = manipulator.map(op) # noqa
return self
[docs] def filter(self, op: Callable[[Any], bool]) -> Self:
"""
filter on pyarrow table's row
Args:
op (Callable[[Any], bool]): handler used to filter
Returns:
Self: Table itself
"""
manipulator = self._row_op()
self.inner_table = manipulator.filter(op)
return self
[docs] def delete(self, index: Union[int, str]) -> Self:
"""
delete an element from pyarrow table
Args:
index (Union[int, str]): element index to delete
Returns:
Self: Table itself
"""
manipulator = self._row_op()
self.inner_table = manipulator.delete(index)
return self
def _calculate_kwargs_for_add(
self, add_new_group: bool = False, is_grouped: bool = True, group_id: int = -1
) -> Dict[str, Any]:
kwargs: Dict[str, Any] = {}
if self.is_dataset_grouped():
if group_id != -1:
kwargs = {
"group_id": group_id - 1,
"add_new_group": True,
"is_grouped": is_grouped,
}
return kwargs
group_column: pyarrow.ChunkedArray = self.inner_table.column(
QianfanDataGroupColumnName
)
calculated_group_id = pc.max(group_column, min_count=0).as_py()
kwargs = {
"group_id": calculated_group_id,
"add_new_group": add_new_group,
"is_grouped": is_grouped,
}
elif self.is_dataset_packed():
kwargs = {"is_dataset_packed": True}
return kwargs
[docs] def append(
self, elem: Any, add_new_group: bool = False, is_grouped: bool = True
) -> Self:
"""
append an element to pyarrow table
Args:
elem (Union[List[Dict], Tuple[Dict], Dict]): Elements added to pyarrow table
add_new_group (bool):
Whether elem has a new group id.
Only used when table is grouped.
is_grouped (bool):
Are element in elem in same group.
Only used when table is grouped and elem is Sequence
and add_new_group was set True.
Default to True, all elements
will be in same group.
If it's True, each element will have
sequential incremental group id from last
available group id.
Returns:
Self: Table itself
"""
manipulator = self._row_op()
self.inner_table = manipulator.append(
elem, **self._calculate_kwargs_for_add(add_new_group, is_grouped)
)
return self
[docs] def insert(
self,
elem: Any,
index: Any,
group_id: int = -1,
add_new_group: bool = False,
is_grouped: bool = True,
) -> Self:
"""
insert an element to pyarrow table
Args:
elem (Union[List[Dict], Tuple[Dict], Dict]): Elements added to pyarrow table
index (int): where to insert element(s)
group_id (int):
which group id you want to apply to new element(s).
Default to -1, which means let group id be automatically
inferred from table.
add_new_group (bool):
Whether elem has a new group id.
Only used when table is grouped
and group_id is -1
is_grouped (bool):
Are element in elem in same group.
Only used when table is grouped and elem is Sequence
and add_new_group was set True.
Default to True, all elements
will be in same group.
If it's True, each element will have
sequential incremental group id from last
available group id.
Returns:
Self: Table itself
"""
manipulator = self._row_op()
self.inner_table = manipulator.insert(
elem,
index,
**self._calculate_kwargs_for_add(add_new_group, is_grouped, group_id),
)
return self
[docs] def list(
self, by: Optional[Union[slice, int, str, Sequence[int], Sequence[str]]] = None
) -> Any:
"""
get element(s) from pyarrow table
Args:
by (Optional[Union[slice, int, Sequence[int]]]):
index or indices for elements, default to None, in which case
return a python list of pyarrow table row
Returns:
Any: pyarrow table row list
"""
manipulator = self._row_op()
return manipulator.list(by)
[docs] def col_map(self, op: Callable[[Any], Any]) -> Self:
"""
map on pyarrow table's column
Args:
op (Callable[[Any], Any]): handler used to map
Returns:
Self: Table itself
"""
manipulator = self._col_op()
self.inner_table = manipulator.map(op) # noqa
return self
[docs] def col_filter(self, op: Callable[[Any], bool]) -> Self:
"""
filter on pyarrow table's column
Args:
op (Callable[[Any], bool]): handler used to filter
Returns:
Self: Table itself
"""
manipulator = self._col_op()
self.inner_table = manipulator.filter(op)
return self
[docs] def col_delete(self, index: Union[int, str]) -> Self:
"""
delete a column from pyarrow table
Args:
index (str): column name to delete
Returns:
Self: Table itself
"""
manipulator = self._col_op()
self.inner_table = manipulator.delete(index)
return self
[docs] def col_append(self, elem: Any) -> Self:
"""
append a row to pyarrow table
Args:
elem (Dict[str, List]): dict containing element added to pyarrow table
key as column name, value as column data
Returns:
Self: Table itself
"""
manipulator = self._col_op()
self.inner_table = manipulator.append(elem)
return self
[docs] def col_insert(self, elem: Any, index: Any) -> Self:
"""
append a row to pyarrow table
Args:
elem (Dict[str, List]): dict containing element added to pyarrow table
must has column name "name" and column data list "data"
index (int): where to insert new column
Returns:
Self: Table itself
"""
manipulator = self._col_op()
self.inner_table = manipulator.insert(elem, index)
return self
[docs] def col_list(
self, by: Optional[Union[slice, int, str, Sequence[int], Sequence[str]]] = None
) -> Any:
"""
get column(s) from pyarrow table
Args:
by (Optional[Union[int, str, Sequence[int], Sequence[str]]]):
index or indices for columns, default to None, in which case
return a python list of pyarrow table column
Returns:
Any: pyarrow table column list
"""
manipulator = self._col_op()
return manipulator.list(by)
[docs] def col_names(self) -> List[str]:
"""
get column name list
Returns:
List[str]: column name list
"""
return self.inner_table.column_names
[docs] def col_renames(self, new_names: List[str]) -> Self:
"""
rename all dataset column
Args:
new_names (List[str]): All new names for columns
Returns:
Self: A brand-new Table with new name
"""
manipulator = self._col_op()
self.inner_table = manipulator.col_renames(new_names)
return self
# 重写 get 和 del 的魔法方法
def __getitem__(self, key: Any) -> Any:
if isinstance(key, str) or (
isinstance(key, Sequence) and isinstance(key[0], str)
):
return self.col_list(key)
return self.list(key)
def __delitem__(self, key: Any) -> None:
if isinstance(key, str):
self.col_delete(key)
elif isinstance(key, int):
self.delete(key)
else:
raise ValueError(f"Unsupported key type {type(key)}")
def __len__(self) -> int:
return self.row_number()
[docs] def row_number(self) -> int:
"""
get pyarrow table row count。
Returns:
int: row count。
"""
return self.inner_table.num_rows
[docs] def column_number(self) -> int:
"""
get pyarrow table column count。
Returns:
int: column count。
"""
return self.inner_table.num_columns
[docs] def to_pylist(self) -> List:
"""
convert a pyarrow table to list
Returns:
List: a list
"""
return self.inner_table.to_pylist()
[docs] def to_pydict(self) -> Dict:
"""
convert a pyarrow table to dict
Returns:
Dict: a dict
"""
return self.inner_table.to_pydict()