-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparser.py
362 lines (330 loc) · 17.6 KB
/
parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
import time
import asyncio
import aiohttp
from lxml import html as lxml_html
from settings import DEBUG, RATING, CHUNK, TIMEOUT, AVAILABLE, BRANDS, SHOPS
if DEBUG:
tic = lambda: time.time()
class Parser:
@staticmethod
async def parse_main(urls: list, finish: int) -> list:
async def main_page_urls(_url: str, _params: int) -> list:
urls = list()
async with aiohttp.ClientSession() as session:
async with session.get(_url, params={'PAGEN_1': _params}) as response:
html = await response.text()
if "kant__catalog__item" in html: # find urls from all shoes items on page
tree = lxml_html.fromstring(html)
a_tags = tree.xpath("//div[@class='kant__catalog__item']//a") # links for smth items
for item in a_tags:
name = item.values()[1].lower()
if 'кроссовки' in name or 'марафонки' in name:
# get useful urls to solution
item_url = 'https://www.kant.ru{}'.format(item.values()[0])
urls.append(item_url)
else: # its not page with running shoes. Stop coroutines!
return list()
return urls
# Check correct input data
if type(urls) is not list:
raise TypeError('Set list of str urls, not more')
for i in urls:
if not (i.startswith('https://www.kant.ru') or i.startswith('http://www.kant.ru')):
raise ValueError('Set correct value to urls, starts with: http(s)://www.kant.ru')
if type(finish) is not int:
raise TypeError('Set correct value to max pagination: from 1 to 30')
if finish < 1 or finish > 30:
raise ValueError('Set correct value to max pagination: from 1 to 30')
if DEBUG:
now = tic()
tac = lambda: "{:.2f}sec".format(time.time() - now)
print('\r\n>>> Start parse_main at ', time.strftime('%H:%M:%S', time.localtime()))
# start parsing urls by chunk of tasks
chunk = CHUNK
solution_urls = list()
all_urls = len(urls)
for i, page_url in enumerate(urls):
tasks = list()
items_urls = list()
do_search = True
for pagination in range(1, finish+1): # go for pages of each item url
if DEBUG:
# progress bar
print('\r{}, progress: {}/ {}. Now {}-th page of {}\r'.format(
tac(), i+1, all_urls, pagination, page_url[20:]), end='')
tasks.append(asyncio.create_task(main_page_urls(page_url, pagination)))
if len(tasks) == chunk or pagination == finish:
new_urls = await asyncio.gather(*tasks)
for urls in new_urls:
# if links is finded and first element is not repeated in the set of the same main url
# then keep looking
if urls and urls[0] not in items_urls:
items_urls.extend(urls)
# finish searching, because it is already repeated or empty task
else:
do_search = False
solution_urls.extend(items_urls) # extend items set within this brand
tasks = list()
await asyncio.sleep(TIMEOUT)
if not do_search: # exit from pagination if empty tasks or repeat items content
break
if DEBUG:
print('>>> End parse_main on {} sec. Find {} urls.\n'.format(tac(), len(solution_urls)))
return solution_urls
@staticmethod
async def parse_details(urls: list) -> list:
"""
Parsed list urls, format 'https://www.kant.ru/catalog/product/123456(78)/'
to full details info:
code-- primary unic value to operate of each items from db
brand-- item brand name, mostly commons with names from setting.BRANDS
model-- unic item name
url-- this func argv
img-- url to item img, small pic
age-- 'взрослый', 'юниор', 'детский' and may be smth else
gender-- 'мужской', 'женский', 'man', 'woman', , 'унисекс', 'для мальчика', 'для девочек'
year-- '2020', '2021', '2020-2021', '20-2021' and may be smth else
use-- 'грунт', 'асфальт', 'снег/ лед', may be smth else
pronation-- 'нейтральная', 'нейтральная/ нейтральная', 'с поддержкой' and more
article-- unic value within model name of brand
season-- 'лето', 'демисезон', 'зима' and more
rating-- program rate for ordering and analytics items data by changes prices of items or change availability
timestamp-- stamp to update
return items info by list of tuples
"""
async def parse(_url: str, _timestamp: str) -> tuple:
"""
Parsed url of item by aiohttp.ClientSession.get and lxml.html
return tuple of full item info
"""
async with aiohttp.ClientSession() as session:
async with session.get(_url) as response:
html = await response.text()
if not html:
return None, None # output need tuple return, full description item card
code = brand = model = img = None
age = gender = article = season = use = pronation = ''
year = 0
tree = lxml_html.fromstring(html)
running = False # are the running shoes for sure?
# commons attrs from xpath objs: values, text, xpath, text_content, keys, label, items, base, attrib
for item in tree.xpath("//div[@class='kant__product__detail-item']"): # product description card
column = item.xpath("span[1]/text()")[0]
if len(item.xpath("span[2]/text()")) > 0:
value = item.xpath("span[2]/text()")[0]
if (column == 'Назначение' and 'бег' in value) or \
(column == 'Тип' and 'кроссовки' in value or 'марафонки' in value):
running = True
if column == 'Бренд':
brand = value.lower()
if running: # card description found!
name = tree.xpath("//div[@id='kantMainCardProduct']/h1/text()")[0].lower()
if brand is None:
if 'кроссовки' in name or 'марафонки' in name:
temp = [i for i in name.split() if i in [j.lower() for j in BRANDS]] # brandname from item name
if temp:
brand = temp[0]
model = name.partition(brand)[2].strip()
brand = brand.title()
code = tree.xpath("//div[@class='kant__product__code']/strong/text()")[0]
code = int(code) if code.isdecimal() else 0
if tree.xpath("//div[@class='kant__product__color__thumbs']//img"):
img = 'https://www.kant.ru' + \
tree.xpath("//div[@class='kant__product__color__thumbs']//img")[0].values()[0]
else:
img = 'https://www.kant.ru'
for item in tree.xpath("//div[@class='kant__product__detail-item']"):
column = str(item.xpath("span[1]/text()")[0])
if len(item.xpath("span[2]/text()")) > 0:
value = str(item.xpath("span[2]/text()")[0])
if column == 'Возраст':
age = value
if column == 'Пол':
gender = value
# sometimes year like 2021-22 (not 2021, or not 2022), produces on 2021 (earlest date)
# in fact. So set one first earliest date
if column == 'Модельный год':
value = value.partition('-')[0]
year = int(value) if value.isdecimal() else 0
if column == 'Покрытие':
use = value
if column == 'Пронация':
pronation = value
if column == 'Артикул':
article = value
if column == 'Сезон':
season = value
# special to Saucony brand:
if brand == 'Saucony' and model.startswith('s-'):
age = 'junior' if age == '' else age
# end special Saucony
#
# special to Hoka brand:
if brand == 'Hoka':
if model.startswith('m '):
gender = 'man' if gender == '' else gender
elif model.startswith('w '):
gender = 'woman' if gender == '' else gender
model = model[2:]
# end special Hoka
# TODO more specific setting for other brands or other keys or agregate more column to one in future
return code, brand, model, _url, img, age, gender, year, use, pronation, article, season, RATING, _timestamp
# check urls content on correct with prev call func parse_main.main_page_urls
if type(urls) is not list:
raise TypeError
# check url format
if not urls[0].startswith('https://www.kant.ru'):
raise ValueError
if DEBUG:
now = tic()
tac = lambda: '{:.2f}sec'.format(time.time() - now)
print('\r\n>>> Start parse_details at ', time.strftime('%H:%M:%S', time.localtime()))
all_urls = len(urls)
products = list()
tasks = list()
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
for i, url in enumerate(urls):
tasks.append(asyncio.create_task(parse(url, timestamp)))
if len(tasks) == CHUNK or i+1 == all_urls:
new = await asyncio.gather(*tasks)
new = [i for i in new if i[0] is not None] # valid parsing from async def 'parse'
products.extend(new)
tasks = list()
await asyncio.sleep(TIMEOUT)
if DEBUG:
print('\r{} sec, {}/ {}\r'.format(tac(), i+1, all_urls), end='') # progress bar
if DEBUG:
print('>>> End parse_details on {} sec. Parsed {} items.\n'.format(tac(), len(products)))
return products # [(code, brand, model, url, img, age..), (code, brand, model, ..), ...,]
@staticmethod
async def parse_price(codes_urls: list) -> list:
async def get_and_parse(code, url: str) -> tuple:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
html = await response.text()
if not html:
return None, None
tree = lxml_html.fromstring(html)
if tree.xpath("//div[@class='kant__product__price']/span[2]/text()"):
price = ''.join(tree.xpath("//div[@class='kant__product__price']/span[2]/text()")[0].split(' '))
price = int(price) if price.isdecimal() else 0
else:
price = 0
return code, price
# check correct income data, types of pairs: '(code, url)'
if not (type(codes_urls) is list and type(codes_urls[0]) is tuple and type(codes_urls[0][0]) is int
and type(codes_urls[0][1]) is str):
raise TypeError
# check correct income data, values of codes and format url
if not (100_000 < codes_urls[0][0] < 9_999_999 and codes_urls[0][1].startswith('https://www.kant.ru/')):
raise ValueError
tasks = list()
products = list() # general solution list
all_urls = len(codes_urls)
if DEBUG:
now = tic()
tac = lambda: "{:.2f}sec".format(time.time() - now)
print('\r\n>>> Start parse_price at ', time.strftime('%H:%M:%S', time.localtime()))
for i, (code, url) in enumerate(codes_urls):
tasks.append(asyncio.create_task(get_and_parse(code, url)))
if len(tasks) == CHUNK or i+1 == all_urls:
if DEBUG:
print('\r{} sec, {}/ {}: {}\r'.format(tac(), i+1, all_urls, url), end='')
new = await asyncio.gather(*tasks)
new = [i for i in new if i[0] is not None]
products.extend(new)
await asyncio.sleep(TIMEOUT) # add value if you banned from kant.ru
tasks = list()
if DEBUG:
print('>>> End parse_price on {} sec. Parsed {} items.\n'.format(tac(), len(products)))
return products # [(code, price), (code, price)...]
@staticmethod
async def parse_available(codes: list)-> list:
async def parse_instock(_code: int, _instock_code: int) -> tuple:
async with aiohttp.ClientSession() as session:
async with session.get(AVAILABLE, params={'ID': _instock_code}) as response:
html = await response.text()
if not html:
return None, None
tree = lxml_html.fromstring(html)
popur_row_div = tree.xpath("//div[@data-tab='tab958']/div") # div class = popur__row
tables = tree.xpath("//div[@data-tab='tab958']/table")
table_index = 0
shops = SHOPS
shop = None
in_stock = dict()
for i, div in enumerate(popur_row_div):
div_content = div.text_content().lower()
if 'нагорная' in div_content:
if 'нет в наличии' not in div_content:
shop = shops[0]
else:
continue
if 'тимирязевская' in div_content:
if 'нет в наличии' not in div_content:
shop = shops[1]
else:
continue
if 'теплый стан' in div_content:
if 'нет в наличии' not in div_content:
shop = shops[2]
else:
continue
if 'алтуфьево' in div_content:
if 'нет в наличии' not in div_content:
shop = shops[3]
else:
break
in_stock[shop] = list()
table = tables[table_index]
table_index += 1
tr = table.xpath("tr")
for row in tr:
row_list = row.text_content().split()
row_count = len(row_list)
if row_count == 2 or row_count == 3: # size and availability found!
size_temp = row_list[0].lower()
# special for 'Hoka' brand:
# incoming size format: US:11/12
if '/' in size_temp:
size_temp = size_temp.partition('/')[0]
# end special for 'Hoka'
size = None
# sizes to database oriented (converter) to US size
if size_temp.startswith('us'):
size = float(size_temp.split(':')[1].replace(',', '.', 1))
# transform to US size
if size_temp.startswith('uk'):
size = float(size_temp.split(':')[1].replace(',', '.', 1)) + 1
# junior size
elif size_temp.startswith('k'):
size = float(size[1:])
# TODO convert to US size! Junior shoes, as usual
if size_temp.startswith('eur'):
size = float(size_temp.split(':')[1].replace(',', '.', 1))
in_stock[shop].append((size, int(row_list[1]) if row_count == 2 else int(row_list[2])))
return _code, in_stock
# correct data, for testing
item = codes[0]
if not (type(item) is tuple and type(item[0]) is int and type(item[1]) is int):
raise TypeError
tasks = list()
products = list() # general solution list
count_codes = len(codes)
if DEBUG:
now = tic()
tac = lambda: "{:.2f}sec".format(time.time() - now)
print('\r\n>>> Start parse_available at ', time.strftime('%H:%M:%S', time.localtime()))
for i, (code, instock_code) in enumerate(codes):
tasks.append(asyncio.create_task(parse_instock(code, instock_code)))
if len(tasks) == CHUNK or i+1 == count_codes:
if DEBUG:
print('\r{}, {}/ {} items; current code: {}\r'.format(tac(), i+1, count_codes, code), end='')
new = await asyncio.gather(*tasks)
new = [i for i in new if i[0] is not None]
products.extend(new)
await asyncio.sleep(TIMEOUT * 0.5) # get tiny html (like json), so reduce timeout to faster loading
tasks = list()
if DEBUG:
print('>>> End parse_available on {} sec. Parsed {} items.\n'.format(tac(), len(products)))
return products