파이썬에 대한 간단한 프로세스 기반 병렬 맵이 있습니까?
파이썬에 대한 간단한 프로세스 기반 병렬 맵, 즉 함수를 찾고 있습니다.
parmap(function,[data])
다른 프로세스에서 [data]의 각 요소에 대해 함수를 실행하고 (음, 다른 코어에 있지만 AFAIK, 파이썬의 다른 코어에서 항목을 실행하는 유일한 방법은 여러 인터프리터를 시작하는 것입니다) 결과 목록을 반환합니다. .
이와 같은 것이 존재합니까? 나는 간단한 것을 원 하므로 간단한 모듈이 좋을 것입니다. 물론, 그런 것이 존재하지 않는다면 큰 도서관에 만족할 것입니다 :-/
필요한 것은 multiprocessing.Pool () 의 map 메서드 인 것 같습니다 .
map (func, iterable [, chunksize])
A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks till the result is ready. This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integ
예를 들어,이 함수를 매핑하려는 경우 :
def f(x):
return x**2
range (10)에 내장 된 map () 함수를 사용하여 수행 할 수 있습니다.
map(f, range(10))
또는 multiprocessing.Pool () 객체의 메소드 map () 사용 :
import multiprocessing
pool = multiprocessing.Pool()
print pool.map(f, range(10))
나는 이것이 오래된 게시물이라는 것을 알고 있지만, 만약을 대비하여 parmapper 라고 불리는이 매우 쉽고 매우 쉽게 만드는 도구를 작성했습니다 (실제로는 그것을 parmap 이라고 부르지 만 이름은 가져 왔습니다).
프로세스의 많은 설정 및 분해를 처리하고 수많은 기능을 추가합니다. 중요한 순서대로
- 람다 및 기타 피클 할 수없는 함수를 사용할 수 있습니다.
- 직접 사용하기 매우 쉽게 만들기 위해 스타 맵 및 기타 유사한 호출 방법을 적용 할 수 있습니다.
- 스레드 및 / 또는 프로세스간에 분할 가능
- 진행률 표시 줄과 같은 기능 포함
적은 비용이 발생하지만 대부분의 경우 무시할 수 있습니다.
도움이 되셨기를 바랍니다.
(참고 : map
Python 3+에서 와 마찬가지로 iterable을 반환하므로 모든 결과가 즉시 전달 될 것으로 예상되면을 사용하십시오. list()
)
R의 mclapply ()와 동등한 Python을 찾는 사람들을 위해 여기에 내 구현이 있습니다. 다음 두 가지 예를 개선 한 것입니다.
- " 병렬화 판다 맵 () 또는 적용 () "@Rafael 발레로 언급한다.
- 여러 인수가있는 함수에 맵을 적용하는 방법 .
단일 또는 다중 인수가있는 맵 함수에 적용 할 수 있습니다.
import numpy as np, pandas as pd
from scipy import sparse
import functools, multiprocessing
from multiprocessing import Pool
num_cores = multiprocessing.cpu_count()
def parallelize_dataframe(df, func, U=None, V=None):
#blockSize = 5000
num_partitions = 5 # int( np.ceil(df.shape[0]*(1.0/blockSize)) )
blocks = np.array_split(df, num_partitions)
pool = Pool(num_cores)
if V is not None and U is not None:
# apply func with multiple arguments to dataframe (i.e. involves multiple columns)
df = pd.concat(pool.map(functools.partial(func, U=U, V=V), blocks))
else:
# apply func with one argument to dataframe (i.e. involves single column)
df = pd.concat(pool.map(func, blocks))
pool.close()
pool.join()
return df
def square(x):
return x**2
def test_func(data):
print("Process working on: ", data.shape)
data["squareV"] = data["testV"].apply(square)
return data
def vecProd(row, U, V):
return np.sum( np.multiply(U[int(row["obsI"]),:], V[int(row["obsJ"]),:]) )
def mProd_func(data, U, V):
data["predV"] = data.apply( lambda row: vecProd(row, U, V), axis=1 )
return data
def generate_simulated_data():
N, D, nnz, K = [302, 184, 5000, 5]
I = np.random.choice(N, size=nnz, replace=True)
J = np.random.choice(D, size=nnz, replace=True)
vals = np.random.sample(nnz)
sparseY = sparse.csc_matrix((vals, (I, J)), shape=[N, D])
# Generate parameters U and V which could be used to reconstruct the matrix Y
U = np.random.sample(N*K).reshape([N,K])
V = np.random.sample(D*K).reshape([D,K])
return sparseY, U, V
def main():
Y, U, V = generate_simulated_data()
# find row, column indices and obvseved values for sparse matrix Y
(testI, testJ, testV) = sparse.find(Y)
colNames = ["obsI", "obsJ", "testV", "predV", "squareV"]
dtypes = {"obsI":int, "obsJ":int, "testV":float, "predV":float, "squareV": float}
obsValDF = pd.DataFrame(np.zeros((len(testV), len(colNames))), columns=colNames)
obsValDF["obsI"] = testI
obsValDF["obsJ"] = testJ
obsValDF["testV"] = testV
obsValDF = obsValDF.astype(dtype=dtypes)
print("Y.shape: {!s}, #obsVals: {}, obsValDF.shape: {!s}".format(Y.shape, len(testV), obsValDF.shape))
# calculate the square of testVals
obsValDF = parallelize_dataframe(obsValDF, test_func)
# reconstruct prediction of testVals using parameters U and V
obsValDF = parallelize_dataframe(obsValDF, mProd_func, U, V)
print("obsValDF.shape after reconstruction: {!s}".format(obsValDF.shape))
print("First 5 elements of obsValDF:\n", obsValDF.iloc[:5,:])
if __name__ == '__main__':
main()
이는 Python 코드를 쉽게 병렬화하고 배포 할 수있는 시스템 인 Ray로 우아하게 수행 할 수 있습니다 .
예제를 병렬화하려면 @ray.remote
데코레이터로 지도 함수를 정의한 다음 .remote
. 이렇게하면 원격 기능의 모든 인스턴스가 다른 프로세스에서 실행됩니다.
import time
import ray
ray.init()
# Define the function you want to apply map on, as remote function.
@ray.remote
def f(x):
# Do some work...
time.sleep(1)
return x*x
# Define a helper parmap(f, list) function.
# This function executes a copy of f() on each element in "list".
# Each copy of f() runs in a different process.
# Note f.remote(x) returns a future of its result (i.e.,
# an identifier of the result) rather than the result itself.
def parmap(f, list):
return [f.remote(x) for x in list]
# Call parmap() on a list consisting of first 5 integers.
result_ids = parmap(f, range(1, 6))
# Get the results
results = ray.get(result_ids)
print(results)
다음과 같이 인쇄됩니다.
[1, 4, 9, 16, 25]
and it will finish in approximately len(list)/p
(rounded up the nearest integer) where p
is number of cores on your machine. Assuming a machine with 2 cores, our example will execute in 5/2
rounded up, i.e, in approximately 3
sec.
There are a number of advantages of using Ray over the multiprocessing module. In particular, the same code will run on a single machine as well as on a cluster of machines. For more advantages of Ray see this related post.
참고URL : https://stackoverflow.com/questions/1704401/is-there-a-simple-process-based-parallel-map-for-python
'program story' 카테고리의 다른 글
WPF 응용 프로그램에 나타나는 이상한 블랙 박스 (0) | 2020.12.08 |
---|---|
C ++에서 func ()와 (* this) .func ()의 차이점 (0) | 2020.12.08 |
jQuery : first 대 .first () (0) | 2020.12.08 |
명령 줄에서 PHP 코드 문자열 실행 (0) | 2020.12.08 |
Haskell 기능 적용 및 커링 (0) | 2020.12.08 |