Python Pandas로 들어오는 실시간 데이터를 처리하는 방법
Pandas로 실시간 수신 데이터를 처리하는 가장 권장 / 파이썬 방식은 무엇입니까?
몇 초마다 아래 형식의 데이터 포인트를받습니다.
{'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH',
'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}
기존 DataFrame에 추가 한 다음 일부 분석을 실행하고 싶습니다.
문제는 DataFrame.append로 행을 추가하는 것만으로도 모든 복사에서 성능 문제가 발생할 수 있다는 것입니다.
내가 시도한 것 :
몇몇 사람들은 빅 데이터 프레임을 미리 할당하고 데이터가 들어 오면 업데이트 할 것을 제안했습니다.
In [1]: index = pd.DatetimeIndex(start='2013-01-01 00:00:00', freq='S', periods=5)
In [2]: columns = ['high', 'low', 'open', 'close']
In [3]: df = pd.DataFrame(index=t, columns=columns)
In [4]: df
Out[4]:
high low open close
2013-01-01 00:00:00 NaN NaN NaN NaN
2013-01-01 00:00:01 NaN NaN NaN NaN
2013-01-01 00:00:02 NaN NaN NaN NaN
2013-01-01 00:00:03 NaN NaN NaN NaN
2013-01-01 00:00:04 NaN NaN NaN NaN
In [5]: data = {'time' :'2013-01-01 00:00:02', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}
In [6]: data_ = pd.Series(data)
In [7]: df.loc[data['time']] = data_
In [8]: df
Out[8]:
high low open close
2013-01-01 00:00:00 NaN NaN NaN NaN
2013-01-01 00:00:01 NaN NaN NaN NaN
2013-01-01 00:00:02 4 3 2 1
2013-01-01 00:00:03 NaN NaN NaN NaN
2013-01-01 00:00:04 NaN NaN NaN NaN
다른 대안은 사전 목록을 작성하는 것입니다. 들어오는 데이터를 목록에 추가하고 작업을 수행하기 위해 더 작은 DataFrame으로 분할하기 만하면됩니다.
In [9]: ls = []
In [10]: for n in range(5):
.....: # Naive stuff ahead =)
.....: time = '2013-01-01 00:00:0' + str(n)
.....: d = {'time' : time, 'stock' : 'BLAH', 'high' : np.random.rand()*10, 'low' : np.random.rand()*10, 'open' : np.random.rand()*10, 'close' : np.random.rand()*10}
.....: ls.append(d)
In [11]: df = pd.DataFrame(ls[1:3]).set_index('time')
In [12]: df
Out[12]:
close high low open stock
time
2013-01-01 00:00:01 3.270078 1.008289 7.486118 2.180683 BLAH
2013-01-01 00:00:02 3.883586 2.215645 0.051799 2.310823 BLAH
또는 그런 식으로 입력을 조금 더 처리 할 수 있습니다.
다음과 같이 HDF5 / pytables를 사용합니다.
- 데이터를 "가능한 한 오랫동안"파이썬 목록으로 유지하십시오.
- 해당 목록에 결과를 추가하십시오.
- "크게"되면 :
- pandas io (및 추가 가능한 테이블)를 사용하여 HDF5 Store로 푸시합니다.
- 목록을 지우십시오.
- 반복.
실제로 내가 정의한 함수는 각 "키"에 대한 목록을 사용하므로 동일한 프로세스에서 여러 DataFrame을 HDF5 Store에 저장할 수 있습니다.
각 행으로 호출하는 함수를 정의합니다 d
.
CACHE = {}
STORE = 'store.h5' # Note: another option is to keep the actual file open
def process_row(d, key, max_len=5000, _cache=CACHE):
"""
Append row d to the store 'key'.
When the number of items in the key's cache reaches max_len,
append the list of rows to the HDF5 store and clear the list.
"""
# keep the rows for each key separate.
lst = _cache.setdefault(key, [])
if len(lst) >= max_len:
store_and_clear(lst, key)
lst.append(d)
def store_and_clear(lst, key):
"""
Convert key's cache list to a DataFrame and append that to HDF5.
"""
df = pd.DataFrame(lst)
with pd.HDFStore(STORE) as store:
store.append(key, df)
lst.clear()
참고 : with 문을 사용하여 각 쓰기 후 자동으로 저장소를 닫습니다. 이 수 를 열어 둘 빠를 수 있지만, 그렇다면 당신이 정기적으로 세척하는 것이 좋습니다 (플러시를 폐쇄) . 또한 목록보다는 컬렉션 deque를 사용하는 것이 더 읽기 쉬울 수 있지만 여기에서는 목록의 성능이 약간 더 좋습니다.
이것을 사용하려면 다음과 같이 호출하십시오.
process_row({'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0},
key="df")
Note: "df" is the stored key used in the pytables store.
Once the job has finished ensure you store_and_clear
the remaining cache:
for k, lst in CACHE.items(): # you can instead use .iteritems() in python 2
store_and_clear(lst, k)
Now your complete DataFrame is available via:
with pd.HDFStore(STORE) as store:
df = store["df"] # other keys will be store[key]
Some comments:
- 5000 can be adjusted, try with some smaller/larger numbers to suit your needs.
- List append is O(1), DataFrame append is O(
len(df)
). - Until you're doing stats or data-munging you don't need pandas, use what's fastest.
- This code works with multiple key's (data points) coming in.
- This is very little code, and we're staying in vanilla python list and then pandas dataframe...
Additionally, to get the up to date reads you could define a get method which stores and clears before reading. In this way you would get the most up to date data:
def get_latest(key, _cache=CACHE):
store_and_clear(_cache[key], key)
with pd.HDFStore(STORE) as store:
return store[key]
Now when you access with:
df = get_latest("df")
you'll get the latest "df" available.
Another option is slightly more involved: define a custom table in vanilla pytables, see the tutorial.
Note: You need to know the field-names to create the column descriptor.
You are actually trying to solve two problems: capturing real-time data and analyzing that data. The first problem can be solved with Python logging, which is designed for this purpose. Then the other problem can be solved by reading that same log file.
ReferenceURL : https://stackoverflow.com/questions/16740887/how-to-handle-incoming-real-time-data-with-python-pandas
'program story' 카테고리의 다른 글
C ++ 로깅 프레임 워크 제안 (0) | 2020.12.26 |
---|---|
.NET Webbrowser 컨트롤에서 메모리 누수를 피하는 방법은 무엇입니까? (0) | 2020.12.26 |
git-subtree 추가 후 rebase하는 방법은 무엇입니까? (0) | 2020.12.26 |
스칼라 : 함수 인수에서 튜플 분해 (0) | 2020.12.25 |
GreenDAO 스키마 업데이트 및 데이터 마이그레이션? (0) | 2020.12.25 |